001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.ChoreService.CHORE_SERVICE_INITIAL_POOL_SIZE;
021import static org.apache.hadoop.hbase.ChoreService.DEFAULT_CHORE_SERVICE_INITIAL_POOL_SIZE;
022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
024import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
025import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
026import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
027import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
028import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
029import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
030import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
033import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
034import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
035import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
036
037import com.google.errorprone.annotations.RestrictedApi;
038import io.opentelemetry.api.trace.Span;
039import io.opentelemetry.api.trace.StatusCode;
040import io.opentelemetry.context.Scope;
041import java.io.IOException;
042import java.io.PrintWriter;
043import java.lang.management.MemoryType;
044import java.lang.management.MemoryUsage;
045import java.lang.reflect.Constructor;
046import java.net.BindException;
047import java.net.InetAddress;
048import java.net.InetSocketAddress;
049import java.time.Duration;
050import java.util.ArrayList;
051import java.util.Collection;
052import java.util.Collections;
053import java.util.Comparator;
054import java.util.HashSet;
055import java.util.Iterator;
056import java.util.List;
057import java.util.Map;
058import java.util.Map.Entry;
059import java.util.Objects;
060import java.util.Optional;
061import java.util.Set;
062import java.util.SortedMap;
063import java.util.Timer;
064import java.util.TimerTask;
065import java.util.TreeMap;
066import java.util.TreeSet;
067import java.util.concurrent.ConcurrentHashMap;
068import java.util.concurrent.ConcurrentMap;
069import java.util.concurrent.ConcurrentSkipListMap;
070import java.util.concurrent.ThreadLocalRandom;
071import java.util.concurrent.TimeUnit;
072import java.util.concurrent.atomic.AtomicBoolean;
073import java.util.concurrent.locks.ReentrantReadWriteLock;
074import java.util.function.Consumer;
075import java.util.stream.Collectors;
076import javax.management.MalformedObjectNameException;
077import javax.servlet.http.HttpServlet;
078import org.apache.commons.lang3.StringUtils;
079import org.apache.commons.lang3.SystemUtils;
080import org.apache.commons.lang3.mutable.MutableFloat;
081import org.apache.hadoop.conf.Configuration;
082import org.apache.hadoop.fs.FileSystem;
083import org.apache.hadoop.fs.Path;
084import org.apache.hadoop.hbase.Abortable;
085import org.apache.hadoop.hbase.CacheEvictionStats;
086import org.apache.hadoop.hbase.CallQueueTooBigException;
087import org.apache.hadoop.hbase.ChoreService;
088import org.apache.hadoop.hbase.ClockOutOfSyncException;
089import org.apache.hadoop.hbase.CoordinatedStateManager;
090import org.apache.hadoop.hbase.DoNotRetryIOException;
091import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException;
092import org.apache.hadoop.hbase.HBaseConfiguration;
093import org.apache.hadoop.hbase.HBaseInterfaceAudience;
094import org.apache.hadoop.hbase.HConstants;
095import org.apache.hadoop.hbase.HDFSBlocksDistribution;
096import org.apache.hadoop.hbase.HealthCheckChore;
097import org.apache.hadoop.hbase.MetaRegionLocationCache;
098import org.apache.hadoop.hbase.MetaTableAccessor;
099import org.apache.hadoop.hbase.NotServingRegionException;
100import org.apache.hadoop.hbase.PleaseHoldException;
101import org.apache.hadoop.hbase.ScheduledChore;
102import org.apache.hadoop.hbase.Server;
103import org.apache.hadoop.hbase.ServerName;
104import org.apache.hadoop.hbase.Stoppable;
105import org.apache.hadoop.hbase.TableDescriptors;
106import org.apache.hadoop.hbase.TableName;
107import org.apache.hadoop.hbase.YouAreDeadException;
108import org.apache.hadoop.hbase.ZNodeClearer;
109import org.apache.hadoop.hbase.client.ClusterConnection;
110import org.apache.hadoop.hbase.client.Connection;
111import org.apache.hadoop.hbase.client.ConnectionUtils;
112import org.apache.hadoop.hbase.client.RegionInfo;
113import org.apache.hadoop.hbase.client.RegionInfoBuilder;
114import org.apache.hadoop.hbase.client.RegionServerRegistry;
115import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
116import org.apache.hadoop.hbase.client.ServerConnectionUtils;
117import org.apache.hadoop.hbase.client.locking.EntityLock;
118import org.apache.hadoop.hbase.client.locking.LockServiceClient;
119import org.apache.hadoop.hbase.conf.ConfigurationManager;
120import org.apache.hadoop.hbase.conf.ConfigurationObserver;
121import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
122import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
123import org.apache.hadoop.hbase.exceptions.RegionMovedException;
124import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
125import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
126import org.apache.hadoop.hbase.executor.ExecutorService;
127import org.apache.hadoop.hbase.executor.ExecutorType;
128import org.apache.hadoop.hbase.fs.HFileSystem;
129import org.apache.hadoop.hbase.http.InfoServer;
130import org.apache.hadoop.hbase.io.hfile.BlockCache;
131import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
132import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
133import org.apache.hadoop.hbase.io.hfile.HFile;
134import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
135import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
136import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
137import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
138import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
139import org.apache.hadoop.hbase.ipc.RpcClient;
140import org.apache.hadoop.hbase.ipc.RpcClientFactory;
141import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
142import org.apache.hadoop.hbase.ipc.RpcServer;
143import org.apache.hadoop.hbase.ipc.RpcServerInterface;
144import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
145import org.apache.hadoop.hbase.ipc.ServerRpcController;
146import org.apache.hadoop.hbase.log.HBaseMarkers;
147import org.apache.hadoop.hbase.master.HMaster;
148import org.apache.hadoop.hbase.master.LoadBalancer;
149import org.apache.hadoop.hbase.mob.MobFileCache;
150import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
151import org.apache.hadoop.hbase.monitoring.TaskMonitor;
152import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
153import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
154import org.apache.hadoop.hbase.net.Address;
155import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
156import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
157import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
158import org.apache.hadoop.hbase.quotas.QuotaUtil;
159import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
160import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
161import org.apache.hadoop.hbase.quotas.RegionSize;
162import org.apache.hadoop.hbase.quotas.RegionSizeStore;
163import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
164import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
165import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
166import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
167import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
168import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
169import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
170import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
171import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
172import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
173import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
174import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
175import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
176import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
177import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
178import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
179import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
180import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
181import org.apache.hadoop.hbase.security.SecurityConstants;
182import org.apache.hadoop.hbase.security.Superusers;
183import org.apache.hadoop.hbase.security.User;
184import org.apache.hadoop.hbase.security.UserProvider;
185import org.apache.hadoop.hbase.security.access.AccessChecker;
186import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
187import org.apache.hadoop.hbase.trace.TraceUtil;
188import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
189import org.apache.hadoop.hbase.util.Addressing;
190import org.apache.hadoop.hbase.util.Bytes;
191import org.apache.hadoop.hbase.util.CommonFSUtils;
192import org.apache.hadoop.hbase.util.CompressionTest;
193import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
194import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
195import org.apache.hadoop.hbase.util.FSTableDescriptors;
196import org.apache.hadoop.hbase.util.FSUtils;
197import org.apache.hadoop.hbase.util.JvmPauseMonitor;
198import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
199import org.apache.hadoop.hbase.util.Pair;
200import org.apache.hadoop.hbase.util.RetryCounter;
201import org.apache.hadoop.hbase.util.RetryCounterFactory;
202import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
203import org.apache.hadoop.hbase.util.Sleeper;
204import org.apache.hadoop.hbase.util.Threads;
205import org.apache.hadoop.hbase.util.VersionInfo;
206import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
207import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
208import org.apache.hadoop.hbase.wal.WAL;
209import org.apache.hadoop.hbase.wal.WALFactory;
210import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
211import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
212import org.apache.hadoop.hbase.zookeeper.ZKAuthentication;
213import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
214import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
215import org.apache.hadoop.hbase.zookeeper.ZKUtil;
216import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
217import org.apache.hadoop.ipc.RemoteException;
218import org.apache.hadoop.util.ReflectionUtils;
219import org.apache.yetus.audience.InterfaceAudience;
220import org.apache.zookeeper.KeeperException;
221import org.slf4j.Logger;
222import org.slf4j.LoggerFactory;
223
224import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
225import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
226import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
227import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
228import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
229import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses;
230import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
231import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
232import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
233import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
234import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
235
236import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
237import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
245import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
246import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
247import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
248import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
249import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
250import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
251import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
252import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
253import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
254import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
255import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
256import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
257import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
258import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
259import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
260import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
261import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
262import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
263import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
264import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
265import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
266
267/**
268 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There
269 * are many HRegionServers in a single HBase deployment.
270 */
271@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
272@SuppressWarnings({ "deprecation" })
273public class HRegionServer extends Thread
274  implements RegionServerServices, LastSequenceId, ConfigurationObserver {
275  private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class);
276
277  int unitMB = 1024 * 1024;
278  int unitKB = 1024;
279
280  /**
281   * For testing only! Set to true to skip notifying region assignment to master .
282   */
283  @InterfaceAudience.Private
284  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL")
285  public static boolean TEST_SKIP_REPORTING_TRANSITION = false;
286
287  /**
288   * A map from RegionName to current action in progress. Boolean value indicates: true - if open
289   * region action in progress false - if close region action in progress
290   */
291  private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
292    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
293
294  /**
295   * Used to cache the open/close region procedures which already submitted. See
296   * {@link #submitRegionProcedure(long)}.
297   */
298  private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>();
299  /**
300   * Used to cache the open/close region procedures which already executed. See
301   * {@link #submitRegionProcedure(long)}.
302   */
303  private final Cache<Long, Long> executedRegionProcedures =
304    CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build();
305
306  /**
307   * Used to cache the moved-out regions
308   */
309  private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder()
310    .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build();
311
312  private MemStoreFlusher cacheFlusher;
313
314  private HeapMemoryManager hMemManager;
315
316  /**
317   * Cluster connection to be shared by services. Initialized at server startup and closed when
318   * server shuts down. Clients must never close it explicitly. Clients hosted by this Server should
319   * make use of this clusterConnection rather than create their own; if they create their own,
320   * there is no way for the hosting server to shutdown ongoing client RPCs.
321   */
322  protected ClusterConnection clusterConnection;
323
324  /**
325   * Go here to get table descriptors.
326   */
327  protected TableDescriptors tableDescriptors;
328
329  // Replication services. If no replication, this handler will be null.
330  private ReplicationSourceService replicationSourceHandler;
331  private ReplicationSinkService replicationSinkHandler;
332
333  // Compactions
334  private CompactSplit compactSplitThread;
335
336  /**
337   * Map of regions currently being served by this region server. Key is the encoded region name.
338   * All access should be synchronized.
339   */
340  private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>();
341  /**
342   * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it
343   * need to be a ConcurrentHashMap?
344   */
345  private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock();
346
347  /**
348   * Map of encoded region names to the DataNode locations they should be hosted on We store the
349   * value as Address since InetSocketAddress is required by the HDFS API (create() that takes
350   * favored nodes as hints for placing file blocks). We could have used ServerName here as the
351   * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API
352   * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and
353   * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the
354   * conversion on demand from Address to InetSocketAddress will guarantee the resolution results
355   * will be fresh when we need it.
356   */
357  private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>();
358
359  private LeaseManager leaseManager;
360
361  // Instance of the hbase executor executorService.
362  protected ExecutorService executorService;
363
364  private volatile boolean dataFsOk;
365  private HFileSystem dataFs;
366  private HFileSystem walFs;
367
368  // Set when a report to the master comes back with a message asking us to
369  // shutdown. Also set by call to stop when debugging or running unit tests
370  // of HRegionServer in isolation.
371  private volatile boolean stopped = false;
372  // Only for testing
373  private boolean isShutdownHookInstalled = false;
374
375  // Go down hard. Used if file system becomes unavailable and also in
376  // debugging and unit tests.
377  private AtomicBoolean abortRequested;
378  static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout";
379  // Default abort timeout is 1200 seconds for safe
380  private static final long DEFAULT_ABORT_TIMEOUT = 1200000;
381  // Will run this task when abort timeout
382  static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task";
383
384  // A state before we go into stopped state. At this stage we're closing user
385  // space regions.
386  private boolean stopping = false;
387  private volatile boolean killed = false;
388  private volatile boolean shutDown = false;
389
390  protected final Configuration conf;
391
392  private Path dataRootDir;
393  private Path walRootDir;
394
395  private final int threadWakeFrequency;
396  final int msgInterval;
397
398  private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period";
399  private final int compactionCheckFrequency;
400  private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period";
401  private final int flushCheckFrequency;
402
403  // Stub to do region server status calls against the master.
404  private volatile RegionServerStatusService.BlockingInterface rssStub;
405  private volatile LockService.BlockingInterface lockStub;
406  // RPC client. Used to make the stub above that does region server status checking.
407  private RpcClient rpcClient;
408
409  private RpcRetryingCallerFactory rpcRetryingCallerFactory;
410  private RpcControllerFactory rpcControllerFactory;
411
412  private UncaughtExceptionHandler uncaughtExceptionHandler;
413
414  // Info server. Default access so can be used by unit tests. REGIONSERVER
415  // is name of the webapp and the attribute name used stuffing this instance
416  // into web context.
417  protected InfoServer infoServer;
418  private JvmPauseMonitor pauseMonitor;
419
420  private RSSnapshotVerifier rsSnapshotVerifier;
421
422  /** region server process name */
423  public static final String REGIONSERVER = "regionserver";
424
425  private MetricsRegionServer metricsRegionServer;
426  MetricsRegionServerWrapperImpl metricsRegionServerImpl;
427
428  /**
429   * ChoreService used to schedule tasks that we want to run periodically
430   */
431  private ChoreService choreService;
432
433  /**
434   * Check for compactions requests.
435   */
436  private ScheduledChore compactionChecker;
437
438  /**
439   * Check for flushes
440   */
441  private ScheduledChore periodicFlusher;
442
443  private volatile WALFactory walFactory;
444
445  private LogRoller walRoller;
446
447  // A thread which calls reportProcedureDone
448  private RemoteProcedureResultReporter procedureResultReporter;
449
450  // flag set after we're done setting up server threads
451  final AtomicBoolean online = new AtomicBoolean(false);
452
453  // zookeeper connection and watcher
454  protected final ZKWatcher zooKeeper;
455
456  // master address tracker
457  private final MasterAddressTracker masterAddressTracker;
458
459  /**
460   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
461   * entries. Used for serving ClientMetaService.
462   */
463  private final MetaRegionLocationCache metaRegionLocationCache;
464
465  // Cluster Status Tracker
466  protected final ClusterStatusTracker clusterStatusTracker;
467
468  // Log Splitting Worker
469  private SplitLogWorker splitLogWorker;
470
471  // A sleeper that sleeps for msgInterval.
472  protected final Sleeper sleeper;
473
474  private final int operationTimeout;
475  private final int shortOperationTimeout;
476
477  // Time to pause if master says 'please hold'
478  private final long retryPauseTime;
479
480  private final RegionServerAccounting regionServerAccounting;
481
482  private NamedQueueServiceChore namedQueueServiceChore = null;
483
484  // Block cache
485  private BlockCache blockCache;
486  // The cache for mob files
487  private MobFileCache mobFileCache;
488
489  /** The health check chore. */
490  private HealthCheckChore healthCheckChore;
491
492  /** The nonce manager chore. */
493  private ScheduledChore nonceManagerChore;
494
495  private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
496
497  /**
498   * The server name the Master sees us as. Its made from the hostname the master passes us, port,
499   * and server startcode. Gets set after registration against Master.
500   */
501  protected ServerName serverName;
502
503  /**
504   * hostname specified by hostname config
505   */
506  protected String useThisHostnameInstead;
507
508  /**
509   * @deprecated since 2.4.0 and will be removed in 4.0.0. Use
510   *             {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
511   * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
512   */
513  @Deprecated
514  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
515  final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
516    "hbase.regionserver.hostname.disable.master.reversedns";
517
518  /**
519   * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive.
520   * Exception will be thrown if both are used.
521   */
522  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
523  final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
524    "hbase.unsafe.regionserver.hostname.disable.master.reversedns";
525
526  /**
527   * This servers startcode.
528   */
529  protected final long startcode;
530
531  /**
532   * Unique identifier for the cluster we are a part of.
533   */
534  protected String clusterId;
535
536  // chore for refreshing store files for secondary regions
537  private StorefileRefresherChore storefileRefresher;
538
539  private volatile RegionServerCoprocessorHost rsHost;
540
541  private RegionServerProcedureManagerHost rspmHost;
542
543  private RegionServerRpcQuotaManager rsQuotaManager;
544  private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
545
546  /**
547   * Nonce manager. Nonces are used to make operations like increment and append idempotent in the
548   * case where client doesn't receive the response from a successful operation and retries. We
549   * track the successful ops for some time via a nonce sent by client and handle duplicate
550   * operations (currently, by failing them; in future we might use MVCC to return result). Nonces
551   * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL
552   * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past
553   * records. If we don't read the records, we don't read and recover the nonces. Some WALs within
554   * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL
555   * recovery during normal region move, so nonces will not be transfered. We can have separate
556   * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path
557   * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a
558   * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part
559   * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't
560   * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be
561   * recovered during move.
562   */
563  final ServerNonceManager nonceManager;
564
565  private UserProvider userProvider;
566
567  protected final RSRpcServices rpcServices;
568
569  private CoordinatedStateManager csm;
570
571  /**
572   * Configuration manager is used to register/deregister and notify the configuration observers
573   * when the regionserver is notified that there was a change in the on disk configs.
574   */
575  protected final ConfigurationManager configurationManager;
576
577  private BrokenStoreFileCleaner brokenStoreFileCleaner;
578
579  private RSMobFileCleanerChore rsMobFileCleanerChore;
580
581  @InterfaceAudience.Private
582  CompactedHFilesDischarger compactedFileDischarger;
583
584  private volatile ThroughputController flushThroughputController;
585
586  private SecureBulkLoadManager secureBulkLoadManager;
587
588  private FileSystemUtilizationChore fsUtilizationChore;
589
590  private final NettyEventLoopGroupConfig eventLoopGroupConfig;
591
592  /**
593   * Provide online slow log responses from ringbuffer
594   */
595  private NamedQueueRecorder namedQueueRecorder = null;
596
597  private BootstrapNodeManager bootstrapNodeManager;
598
599  /**
600   * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to
601   * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing
602   * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a
603   * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace
604   * {@link #TEST_SKIP_REPORTING_TRANSITION} ?
605   */
606  private final boolean masterless;
607  private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless";
608
609  /** regionserver codec list **/
610  private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs";
611
612  // A timer to shutdown the process if abort takes too long
613  private Timer abortMonitor;
614
615  /*
616   * Chore that creates replication marker rows.
617   */
618  private ReplicationMarkerChore replicationMarkerChore;
619
620  // A timer submit requests to the PrefetchExecutor
621  private PrefetchExecutorNotifier prefetchExecutorNotifier;
622
623  /**
624   * Starts a HRegionServer at the default location.
625   * <p/>
626   * Don't start any services or managers in here in the Constructor. Defer till after we register
627   * with the Master as much as possible. See {@link #startServices}.
628   */
629  public HRegionServer(final Configuration conf) throws IOException {
630    super("RegionServer"); // thread name
631    final Span span = TraceUtil.createSpan("HRegionServer.cxtor");
632    try (Scope ignored = span.makeCurrent()) {
633      this.startcode = EnvironmentEdgeManager.currentTime();
634      this.conf = conf;
635      this.dataFsOk = true;
636      this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
637      this.eventLoopGroupConfig = setupNetty(this.conf);
638      MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
639      HFile.checkHFileVersion(this.conf);
640      checkCodecs(this.conf);
641      this.userProvider = UserProvider.instantiate(conf);
642      FSUtils.setupShortCircuitRead(this.conf);
643
644      // Disable usage of meta replicas in the regionserver
645      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
646      // Config'ed params
647      this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
648      this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency);
649      this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency);
650      this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
651
652      this.sleeper = new Sleeper(this.msgInterval, this);
653
654      boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
655      this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
656
657      this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
658        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
659
660      this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
661        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
662
663      this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME,
664        HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME);
665
666      this.abortRequested = new AtomicBoolean(false);
667      this.stopped = false;
668
669      this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
670      rpcServices = createRpcServices();
671      useThisHostnameInstead = getUseThisHostnameInstead(conf);
672
673      // if use-ip is enabled, we will use ip to expose Master/RS service for client,
674      // see HBASE-27304 for details.
675      boolean useIp = conf.getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
676        HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
677      String isaHostName =
678        useIp ? rpcServices.isa.getAddress().getHostAddress() : rpcServices.isa.getHostName();
679      String hostName =
680        StringUtils.isBlank(useThisHostnameInstead) ? isaHostName : useThisHostnameInstead;
681      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
682
683      rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
684      rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf,
685        clusterConnection == null ? null : clusterConnection.getConnectionMetrics());
686
687      // login the zookeeper client principal (if using security)
688      ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
689        HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
690      // login the server principal (if using secure Hadoop)
691      login(userProvider, hostName);
692      // init superusers and add the server principal (if using security)
693      // or process owner as default super user.
694      Superusers.initialize(conf);
695      regionServerAccounting = new RegionServerAccounting(conf);
696
697      boolean isMasterNotCarryTable =
698        this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
699
700      // no need to instantiate block cache and mob file cache when master not carry table
701      if (!isMasterNotCarryTable) {
702        blockCache = BlockCacheFactory.createBlockCache(conf);
703        mobFileCache = new MobFileCache(conf);
704      }
705
706      rsSnapshotVerifier = new RSSnapshotVerifier(conf);
707
708      uncaughtExceptionHandler =
709        (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e);
710
711      initializeFileSystem();
712
713      this.configurationManager = new ConfigurationManager();
714      setupSignalHandlers();
715
716      // Some unit tests don't need a cluster, so no zookeeper at all
717      // Open connection to zookeeper and set primary watcher
718      zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this,
719        canCreateBaseZNode());
720      // If no master in cluster, skip trying to track one or look for a cluster status.
721      if (!this.masterless) {
722        if (
723          conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
724        ) {
725          this.csm = new ZkCoordinatedStateManager(this);
726        }
727
728        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
729        masterAddressTracker.start();
730
731        clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
732        clusterStatusTracker.start();
733      } else {
734        masterAddressTracker = null;
735        clusterStatusTracker = null;
736      }
737      this.rpcServices.start(zooKeeper);
738      this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
739      // This violates 'no starting stuff in Constructor' but Master depends on the below chore
740      // and executor being created and takes a different startup route. Lots of overlap between HRS
741      // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
742      // Master expects Constructor to put up web servers. Ugh.
743      // class HRS. TODO.
744      int choreServiceInitialSize =
745        conf.getInt(CHORE_SERVICE_INITIAL_POOL_SIZE, DEFAULT_CHORE_SERVICE_INITIAL_POOL_SIZE);
746      this.choreService = new ChoreService(getName(), choreServiceInitialSize, true);
747      this.executorService = new ExecutorService(getName());
748      putUpWebUI();
749      span.setStatus(StatusCode.OK);
750    } catch (Throwable t) {
751      // Make sure we log the exception. HRegionServer is often started via reflection and the
752      // cause of failed startup is lost.
753      TraceUtil.setError(span, t);
754      LOG.error("Failed construction RegionServer", t);
755      throw t;
756    } finally {
757      span.end();
758    }
759  }
760
761  // HMaster should override this method to load the specific config for master
762  protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
763    String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
764    if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
765      if (!StringUtils.isBlank(hostname)) {
766        String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and "
767          + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set "
768          + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while "
769          + UNSAFE_RS_HOSTNAME_KEY + " is used";
770        throw new IOException(msg);
771      } else {
772        return rpcServices.isa.getHostName();
773      }
774    } else {
775      return hostname;
776    }
777  }
778
779  private void setupSignalHandlers() {
780    if (!SystemUtils.IS_OS_WINDOWS) {
781      HBasePlatformDependent.handle("HUP", (number, name) -> {
782        try {
783          updateConfiguration();
784        } catch (IOException e) {
785          LOG.error("Problem while reloading configuration", e);
786        }
787      });
788    }
789  }
790
791  private static NettyEventLoopGroupConfig setupNetty(Configuration conf) {
792    // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL.
793    NettyEventLoopGroupConfig nelgc = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
794    NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
795    NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass());
796    return nelgc;
797  }
798
799  private void initializeFileSystem() throws IOException {
800    // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
801    // checksum verification enabled, then automatically switch off hdfs checksum verification.
802    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
803    String walDirUri = CommonFSUtils.getDirUri(this.conf,
804      new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR))));
805    // set WAL's uri
806    if (walDirUri != null) {
807      CommonFSUtils.setFsDefault(this.conf, walDirUri);
808    }
809    // init the WALFs
810    this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
811    this.walRootDir = CommonFSUtils.getWALRootDir(this.conf);
812    // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
813    // underlying hadoop hdfs accessors will be going against wrong filesystem
814    // (unless all is set to defaults).
815    String rootDirUri =
816      CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR)));
817    if (rootDirUri != null) {
818      CommonFSUtils.setFsDefault(this.conf, rootDirUri);
819    }
820    // init the filesystem
821    this.dataFs = new HFileSystem(this.conf, useHBaseChecksum);
822    this.dataRootDir = CommonFSUtils.getRootDir(this.conf);
823    this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir,
824      !canUpdateTableDescriptor(), cacheTableDescriptor());
825  }
826
827  protected void login(UserProvider user, String host) throws IOException {
828    user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE,
829      SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host);
830  }
831
832  /**
833   * Wait for an active Master. See override in Master superclass for how it is used.
834   */
835  protected void waitForMasterActive() {
836  }
837
838  protected String getProcessName() {
839    return REGIONSERVER;
840  }
841
842  protected boolean canCreateBaseZNode() {
843    return this.masterless;
844  }
845
846  protected boolean canUpdateTableDescriptor() {
847    return false;
848  }
849
850  protected boolean cacheTableDescriptor() {
851    return false;
852  }
853
854  protected RSRpcServices createRpcServices() throws IOException {
855    return new RSRpcServices(this);
856  }
857
858  protected void configureInfoServer() {
859    infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class);
860    infoServer.setAttribute(REGIONSERVER, this);
861  }
862
863  protected Class<? extends HttpServlet> getDumpServlet() {
864    return RSDumpServlet.class;
865  }
866
867  /**
868   * Used by {@link RSDumpServlet} to generate debugging information.
869   */
870  public void dumpRowLocks(final PrintWriter out) {
871    StringBuilder sb = new StringBuilder();
872    for (HRegion region : getRegions()) {
873      if (region.getLockedRows().size() > 0) {
874        for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) {
875          sb.setLength(0);
876          sb.append(region.getTableDescriptor().getTableName()).append(",")
877            .append(region.getRegionInfo().getEncodedName()).append(",");
878          sb.append(rowLockContext.toString());
879          out.println(sb);
880        }
881      }
882    }
883  }
884
885  @Override
886  public boolean registerService(com.google.protobuf.Service instance) {
887    /*
888     * No stacking of instances is allowed for a single executorService name
889     */
890    com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
891    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
892    if (coprocessorServiceHandlers.containsKey(serviceName)) {
893      LOG.error("Coprocessor executorService " + serviceName
894        + " already registered, rejecting request from " + instance);
895      return false;
896    }
897
898    coprocessorServiceHandlers.put(serviceName, instance);
899    if (LOG.isDebugEnabled()) {
900      LOG.debug(
901        "Registered regionserver coprocessor executorService: executorService=" + serviceName);
902    }
903    return true;
904  }
905
906  protected ClusterConnection createClusterConnection() throws IOException {
907    // Create a cluster connection that when appropriate, can short-circuit and go directly to the
908    // local server if the request is to the local server bypassing RPC. Can be used for both local
909    // and remote invocations.
910    return ServerConnectionUtils.createShortCircuitConnection(conf, userProvider.getCurrent(),
911      serverName, rpcServices, rpcServices, new RegionServerRegistry(this));
912  }
913
914  /**
915   * Run test on configured codecs to make sure supporting libs are in place.
916   * @param c configuration object
917   * @throws IOException if compression test fails for any regionserver codec
918   */
919  private static void checkCodecs(final Configuration c) throws IOException {
920    // check to see if the codec list is available:
921    String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null);
922    if (codecs == null) {
923      return;
924    }
925    for (String codec : codecs) {
926      if (!CompressionTest.testCompression(codec)) {
927        throw new IOException(
928          "Compression codec " + codec + " not supported, aborting RS construction");
929      }
930    }
931  }
932
933  public String getClusterId() {
934    return this.clusterId;
935  }
936
937  /**
938   * Setup our cluster connection if not already initialized.
939   */
940  protected synchronized void setupClusterConnection() throws IOException {
941    if (clusterConnection == null) {
942      clusterConnection = createClusterConnection();
943    }
944  }
945
946  /**
947   * All initialization needed before we go register with Master.<br>
948   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
949   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
950   */
951  private void preRegistrationInitialization() {
952    final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization");
953    try (Scope ignored = span.makeCurrent()) {
954      initializeZooKeeper();
955      setupClusterConnection();
956      if (!(this instanceof HMaster)) {
957        bootstrapNodeManager = new BootstrapNodeManager(clusterConnection, masterAddressTracker);
958      }
959      // Setup RPC client for master communication
960      this.rpcClient = RpcClientFactory.createClient(conf, clusterId,
961        new InetSocketAddress(this.rpcServices.isa.getAddress(), 0),
962        clusterConnection.getConnectionMetrics(), Collections.emptyMap());
963      span.setStatus(StatusCode.OK);
964    } catch (Throwable t) {
965      // Call stop if error or process will stick around for ever since server
966      // puts up non-daemon threads.
967      TraceUtil.setError(span, t);
968      this.rpcServices.stop();
969      abort("Initialization of RS failed.  Hence aborting RS.", t);
970    } finally {
971      span.end();
972    }
973  }
974
975  /**
976   * Bring up connection to zk ensemble and then wait until a master for this cluster and then after
977   * that, wait until cluster 'up' flag has been set. This is the order in which master does things.
978   * <p>
979   * Finally open long-living server short-circuit connection.
980   */
981  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE",
982      justification = "cluster Id znode read would give us correct response")
983  private void initializeZooKeeper() throws IOException, InterruptedException {
984    // Nothing to do in here if no Master in the mix.
985    if (this.masterless) {
986      return;
987    }
988
989    // Create the master address tracker, register with zk, and start it. Then
990    // block until a master is available. No point in starting up if no master
991    // running.
992    blockAndCheckIfStopped(this.masterAddressTracker);
993
994    // Wait on cluster being up. Master will set this flag up in zookeeper
995    // when ready.
996    blockAndCheckIfStopped(this.clusterStatusTracker);
997
998    // If we are HMaster then the cluster id should have already been set.
999    if (clusterId == null) {
1000      // Retrieve clusterId
1001      // Since cluster status is now up
1002      // ID should have already been set by HMaster
1003      try {
1004        clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
1005        if (clusterId == null) {
1006          this.abort("Cluster ID has not been set");
1007        }
1008        LOG.info("ClusterId : " + clusterId);
1009      } catch (KeeperException e) {
1010        this.abort("Failed to retrieve Cluster ID", e);
1011      }
1012    }
1013
1014    waitForMasterActive();
1015    if (isStopped() || isAborted()) {
1016      return; // No need for further initialization
1017    }
1018
1019    // watch for snapshots and other procedures
1020    try {
1021      rspmHost = new RegionServerProcedureManagerHost();
1022      rspmHost.loadProcedures(conf);
1023      rspmHost.initialize(this);
1024    } catch (KeeperException e) {
1025      this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
1026    }
1027  }
1028
1029  /**
1030   * Utilty method to wait indefinitely on a znode availability while checking if the region server
1031   * is shut down
1032   * @param tracker znode tracker to use
1033   * @throws IOException          any IO exception, plus if the RS is stopped
1034   * @throws InterruptedException if the waiting thread is interrupted
1035   */
1036  private void blockAndCheckIfStopped(ZKNodeTracker tracker)
1037    throws IOException, InterruptedException {
1038    while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
1039      if (this.stopped) {
1040        throw new IOException("Received the shutdown message while waiting.");
1041      }
1042    }
1043  }
1044
1045  /** Returns True if the cluster is up. */
1046  @Override
1047  public boolean isClusterUp() {
1048    return this.masterless
1049      || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
1050  }
1051
1052  private void initializeReplicationMarkerChore() {
1053    boolean replicationMarkerEnabled =
1054      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
1055    // If replication or replication marker is not enabled then return immediately.
1056    if (replicationMarkerEnabled) {
1057      int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
1058        REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
1059      replicationMarkerChore = new ReplicationMarkerChore(this, this, period);
1060    }
1061  }
1062
1063  /**
1064   * The HRegionServer sticks in this loop until closed.
1065   */
1066  @Override
1067  public void run() {
1068    if (isStopped()) {
1069      LOG.info("Skipping run; stopped");
1070      return;
1071    }
1072    try {
1073      installShutdownHook();
1074      // Do pre-registration initializations; zookeeper, lease threads, etc.
1075      preRegistrationInitialization();
1076    } catch (Throwable e) {
1077      abort("Fatal exception during initialization", e);
1078    }
1079
1080    try {
1081      if (!isStopped() && !isAborted()) {
1082        // Initialize the RegionServerCoprocessorHost now that our ephemeral
1083        // node was created, in case any coprocessors want to use ZooKeeper
1084        this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
1085
1086        // Try and register with the Master; tell it we are here. Break if server is stopped or
1087        // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and
1088        // start up all Services. Use RetryCounter to get backoff in case Master is struggling to
1089        // come up.
1090        LOG.debug("About to register with Master.");
1091        TraceUtil.trace(() -> {
1092          RetryCounterFactory rcf =
1093            new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5);
1094          RetryCounter rc = rcf.create();
1095          while (keepLooping()) {
1096            RegionServerStartupResponse w = reportForDuty();
1097            if (w == null) {
1098              long sleepTime = rc.getBackoffTimeAndIncrementAttempts();
1099              LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime);
1100              this.sleeper.sleep(sleepTime);
1101            } else {
1102              handleReportForDutyResponse(w);
1103              break;
1104            }
1105          }
1106        }, "HRegionServer.registerWithMaster");
1107      }
1108
1109      if (!isStopped() && isHealthy()) {
1110        TraceUtil.trace(() -> {
1111          // start the snapshot handler and other procedure handlers,
1112          // since the server is ready to run
1113          if (this.rspmHost != null) {
1114            this.rspmHost.start();
1115          }
1116          // Start the Quota Manager
1117          if (this.rsQuotaManager != null) {
1118            rsQuotaManager.start(getRpcServer().getScheduler());
1119          }
1120          if (this.rsSpaceQuotaManager != null) {
1121            this.rsSpaceQuotaManager.start();
1122          }
1123        }, "HRegionServer.startup");
1124      }
1125
1126      // We registered with the Master. Go into run mode.
1127      long lastMsg = EnvironmentEdgeManager.currentTime();
1128      long oldRequestCount = -1;
1129      // The main run loop.
1130      while (!isStopped() && isHealthy()) {
1131        if (!isClusterUp()) {
1132          if (onlineRegions.isEmpty()) {
1133            stop("Exiting; cluster shutdown set and not carrying any regions");
1134          } else if (!this.stopping) {
1135            this.stopping = true;
1136            LOG.info("Closing user regions");
1137            closeUserRegions(this.abortRequested.get());
1138          } else {
1139            boolean allUserRegionsOffline = areAllUserRegionsOffline();
1140            if (allUserRegionsOffline) {
1141              // Set stopped if no more write requests tp meta tables
1142              // since last time we went around the loop. Any open
1143              // meta regions will be closed on our way out.
1144              if (oldRequestCount == getWriteRequestCount()) {
1145                stop("Stopped; only catalog regions remaining online");
1146                break;
1147              }
1148              oldRequestCount = getWriteRequestCount();
1149            } else {
1150              // Make sure all regions have been closed -- some regions may
1151              // have not got it because we were splitting at the time of
1152              // the call to closeUserRegions.
1153              closeUserRegions(this.abortRequested.get());
1154            }
1155            LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1156          }
1157        }
1158        long now = EnvironmentEdgeManager.currentTime();
1159        if ((now - lastMsg) >= msgInterval) {
1160          tryRegionServerReport(lastMsg, now);
1161          lastMsg = EnvironmentEdgeManager.currentTime();
1162        }
1163        if (!isStopped() && !isAborted()) {
1164          this.sleeper.sleep();
1165        }
1166      } // for
1167    } catch (Throwable t) {
1168      if (!rpcServices.checkOOME(t)) {
1169        String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: ";
1170        abort(prefix + t.getMessage(), t);
1171      }
1172    }
1173
1174    final Span span = TraceUtil.createSpan("HRegionServer exiting main loop");
1175    try (Scope ignored = span.makeCurrent()) {
1176      if (this.leaseManager != null) {
1177        this.leaseManager.closeAfterLeasesExpire();
1178      }
1179      if (this.splitLogWorker != null) {
1180        splitLogWorker.stop();
1181      }
1182      if (this.infoServer != null) {
1183        LOG.info("Stopping infoServer");
1184        try {
1185          this.infoServer.stop();
1186        } catch (Exception e) {
1187          LOG.error("Failed to stop infoServer", e);
1188        }
1189      }
1190      // Send cache a shutdown.
1191      if (blockCache != null) {
1192        blockCache.shutdown();
1193      }
1194      if (mobFileCache != null) {
1195        mobFileCache.shutdown();
1196      }
1197
1198      // Send interrupts to wake up threads if sleeping so they notice shutdown.
1199      // TODO: Should we check they are alive? If OOME could have exited already
1200      if (this.hMemManager != null) {
1201        this.hMemManager.stop();
1202      }
1203      if (this.cacheFlusher != null) {
1204        this.cacheFlusher.interruptIfNecessary();
1205      }
1206      if (this.compactSplitThread != null) {
1207        this.compactSplitThread.interruptIfNecessary();
1208      }
1209
1210      // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
1211      if (rspmHost != null) {
1212        rspmHost.stop(this.abortRequested.get() || this.killed);
1213      }
1214
1215      if (this.killed) {
1216        // Just skip out w/o closing regions. Used when testing.
1217      } else if (abortRequested.get()) {
1218        if (this.dataFsOk) {
1219          closeUserRegions(abortRequested.get()); // Don't leave any open file handles
1220        }
1221        LOG.info("aborting server " + this.serverName);
1222      } else {
1223        closeUserRegions(abortRequested.get());
1224        LOG.info("stopping server " + this.serverName);
1225      }
1226
1227      if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1228        try {
1229          this.clusterConnection.close();
1230        } catch (IOException e) {
1231          // Although the {@link Closeable} interface throws an {@link
1232          // IOException}, in reality, the implementation would never do that.
1233          LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e);
1234        }
1235      }
1236
1237      // Closing the compactSplit thread before closing meta regions
1238      if (!this.killed && containsMetaTableRegions()) {
1239        if (!abortRequested.get() || this.dataFsOk) {
1240          if (this.compactSplitThread != null) {
1241            this.compactSplitThread.join();
1242            this.compactSplitThread = null;
1243          }
1244          closeMetaTableRegions(abortRequested.get());
1245        }
1246      }
1247
1248      if (!this.killed && this.dataFsOk) {
1249        waitOnAllRegionsToClose(abortRequested.get());
1250        LOG.info("stopping server " + this.serverName + "; all regions closed.");
1251      }
1252
1253      // Stop the quota manager
1254      if (rsQuotaManager != null) {
1255        rsQuotaManager.stop();
1256      }
1257      if (rsSpaceQuotaManager != null) {
1258        rsSpaceQuotaManager.stop();
1259        rsSpaceQuotaManager = null;
1260      }
1261
1262      // flag may be changed when closing regions throws exception.
1263      if (this.dataFsOk) {
1264        shutdownWAL(!abortRequested.get());
1265      }
1266
1267      // Make sure the proxy is down.
1268      if (this.rssStub != null) {
1269        this.rssStub = null;
1270      }
1271      if (this.lockStub != null) {
1272        this.lockStub = null;
1273      }
1274      if (this.rpcClient != null) {
1275        this.rpcClient.close();
1276      }
1277      if (this.leaseManager != null) {
1278        this.leaseManager.close();
1279      }
1280      if (this.pauseMonitor != null) {
1281        this.pauseMonitor.stop();
1282      }
1283
1284      if (!killed) {
1285        stopServiceThreads();
1286      }
1287
1288      if (this.rpcServices != null) {
1289        this.rpcServices.stop();
1290      }
1291
1292      try {
1293        deleteMyEphemeralNode();
1294      } catch (KeeperException.NoNodeException nn) {
1295        // pass
1296      } catch (KeeperException e) {
1297        LOG.warn("Failed deleting my ephemeral node", e);
1298      }
1299      // We may have failed to delete the znode at the previous step, but
1300      // we delete the file anyway: a second attempt to delete the znode is likely to fail again.
1301      ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1302
1303      if (this.zooKeeper != null) {
1304        this.zooKeeper.close();
1305      }
1306      this.shutDown = true;
1307      LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
1308      span.setStatus(StatusCode.OK);
1309    } finally {
1310      span.end();
1311    }
1312  }
1313
1314  /**
1315   * This method is called when HMaster and HRegionServer are started. Please see to HBASE-26977 for
1316   * details.
1317   */
1318  private void installShutdownHook() {
1319    ShutdownHook.install(conf, dataFs, this, Thread.currentThread());
1320    isShutdownHookInstalled = true;
1321  }
1322
1323  /**
1324   * This method is used for testing.
1325   */
1326  public boolean isShutdownHookInstalled() {
1327    return isShutdownHookInstalled;
1328  }
1329
1330  private boolean containsMetaTableRegions() {
1331    return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
1332  }
1333
1334  private boolean areAllUserRegionsOffline() {
1335    if (getNumberOfOnlineRegions() > 2) {
1336      return false;
1337    }
1338    boolean allUserRegionsOffline = true;
1339    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1340      if (!e.getValue().getRegionInfo().isMetaRegion()) {
1341        allUserRegionsOffline = false;
1342        break;
1343      }
1344    }
1345    return allUserRegionsOffline;
1346  }
1347
1348  /** Returns Current write count for all online regions. */
1349  private long getWriteRequestCount() {
1350    long writeCount = 0;
1351    for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1352      writeCount += e.getValue().getWriteRequestsCount();
1353    }
1354    return writeCount;
1355  }
1356
1357  @InterfaceAudience.Private
1358  protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1359    throws IOException {
1360    RegionServerStatusService.BlockingInterface rss = rssStub;
1361    if (rss == null) {
1362      // the current server could be stopping.
1363      return;
1364    }
1365    ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1366    final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport");
1367    try (Scope ignored = span.makeCurrent()) {
1368      RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1369      request.setServer(ProtobufUtil.toServerName(this.serverName));
1370      request.setLoad(sl);
1371      rss.regionServerReport(null, request.build());
1372      span.setStatus(StatusCode.OK);
1373    } catch (ServiceException se) {
1374      IOException ioe = ProtobufUtil.getRemoteException(se);
1375      if (ioe instanceof YouAreDeadException) {
1376        // This will be caught and handled as a fatal error in run()
1377        TraceUtil.setError(span, ioe);
1378        throw ioe;
1379      }
1380      if (rssStub == rss) {
1381        rssStub = null;
1382      }
1383      TraceUtil.setError(span, se);
1384      // Couldn't connect to the master, get location from zk and reconnect
1385      // Method blocks until new master is found or we are stopped
1386      createRegionServerStatusStub(true);
1387    } finally {
1388      span.end();
1389    }
1390  }
1391
1392  /**
1393   * Reports the given map of Regions and their size on the filesystem to the active Master.
1394   * @param regionSizeStore The store containing region sizes
1395   * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise
1396   */
1397  public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) {
1398    RegionServerStatusService.BlockingInterface rss = rssStub;
1399    if (rss == null) {
1400      // the current server could be stopping.
1401      LOG.trace("Skipping Region size report to HMaster as stub is null");
1402      return true;
1403    }
1404    try {
1405      buildReportAndSend(rss, regionSizeStore);
1406    } catch (ServiceException se) {
1407      IOException ioe = ProtobufUtil.getRemoteException(se);
1408      if (ioe instanceof PleaseHoldException) {
1409        LOG.trace("Failed to report region sizes to Master because it is initializing."
1410          + " This will be retried.", ioe);
1411        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
1412        return true;
1413      }
1414      if (rssStub == rss) {
1415        rssStub = null;
1416      }
1417      createRegionServerStatusStub(true);
1418      if (ioe instanceof DoNotRetryIOException) {
1419        DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1420        if (doNotRetryEx.getCause() != null) {
1421          Throwable t = doNotRetryEx.getCause();
1422          if (t instanceof UnsupportedOperationException) {
1423            LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1424            return false;
1425          }
1426        }
1427      }
1428      LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1429    }
1430    return true;
1431  }
1432
1433  /**
1434   * Builds the region size report and sends it to the master. Upon successful sending of the
1435   * report, the region sizes that were sent are marked as sent.
1436   * @param rss             The stub to send to the Master
1437   * @param regionSizeStore The store containing region sizes
1438   */
1439  private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss,
1440    RegionSizeStore regionSizeStore) throws ServiceException {
1441    RegionSpaceUseReportRequest request =
1442      buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore));
1443    rss.reportRegionSpaceUse(null, request);
1444    // Record the number of size reports sent
1445    if (metricsRegionServer != null) {
1446      metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size());
1447    }
1448  }
1449
1450  /**
1451   * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map.
1452   * @param regionSizes The size in bytes of regions
1453   * @return The corresponding protocol buffer message.
1454   */
1455  RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) {
1456    RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1457    for (Entry<RegionInfo, RegionSize> entry : regionSizes) {
1458      request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize()));
1459    }
1460    return request.build();
1461  }
1462
1463  /**
1464   * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf
1465   * message.
1466   * @param regionInfo  The RegionInfo
1467   * @param sizeInBytes The size in bytes of the Region
1468   * @return The protocol buffer
1469   */
1470  RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) {
1471    return RegionSpaceUse.newBuilder()
1472      .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo)))
1473      .setRegionSize(Objects.requireNonNull(sizeInBytes)).build();
1474  }
1475
1476  private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1477    throws IOException {
1478    // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests
1479    // per second, and other metrics As long as metrics are part of ServerLoad it's best to use
1480    // the wrapper to compute those numbers in one place.
1481    // In the long term most of these should be moved off of ServerLoad and the heart beat.
1482    // Instead they should be stored in an HBase table so that external visibility into HBase is
1483    // improved; Additionally the load balancer will be able to take advantage of a more complete
1484    // history.
1485    MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1486    Collection<HRegion> regions = getOnlineRegionsLocalContext();
1487    long usedMemory = -1L;
1488    long maxMemory = -1L;
1489    final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage();
1490    if (usage != null) {
1491      usedMemory = usage.getUsed();
1492      maxMemory = usage.getMax();
1493    }
1494
1495    ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder();
1496    serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1497    serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount());
1498    serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024));
1499    serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024));
1500    Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1501    Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder();
1502    for (String coprocessor : coprocessors) {
1503      serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1504    }
1505    RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1506    RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1507    for (HRegion region : regions) {
1508      if (region.getCoprocessorHost() != null) {
1509        Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1510        for (String regionCoprocessor : regionCoprocessors) {
1511          serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build());
1512        }
1513      }
1514      serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1515      for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1516        .getCoprocessors()) {
1517        serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1518      }
1519    }
1520    computeIfPersistentBucketCache(bc -> {
1521      bc.getRegionCachedInfo().forEach((regionName, prefetchSize) -> {
1522        serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB));
1523      });
1524    });
1525    serverLoad.setReportStartTime(reportStartTime);
1526    serverLoad.setReportEndTime(reportEndTime);
1527    if (this.infoServer != null) {
1528      serverLoad.setInfoServerPort(this.infoServer.getPort());
1529    } else {
1530      serverLoad.setInfoServerPort(-1);
1531    }
1532    MetricsUserAggregateSource userSource =
1533      metricsRegionServer.getMetricsUserAggregate().getSource();
1534    if (userSource != null) {
1535      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
1536      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
1537        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
1538      }
1539    }
1540
1541    // for the replicationLoad purpose. Only need to get from one executorService
1542    // either source or sink will get the same info
1543    ReplicationSourceService rsources = getReplicationSourceService();
1544    if (rsources != null) {
1545      // always refresh first to get the latest value
1546      ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1547      if (rLoad != null) {
1548        serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1549        for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
1550          .getReplicationLoadSourceEntries()) {
1551          serverLoad.addReplLoadSource(rLS);
1552        }
1553
1554      }
1555    }
1556
1557    TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask
1558      .newBuilder().setDescription(task.getDescription())
1559      .setStatus(task.getStatus() != null ? task.getStatus() : "")
1560      .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name()))
1561      .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build()));
1562
1563    return serverLoad.build();
1564  }
1565
1566  private String getOnlineRegionsAsPrintableString() {
1567    StringBuilder sb = new StringBuilder();
1568    for (Region r : this.onlineRegions.values()) {
1569      if (sb.length() > 0) {
1570        sb.append(", ");
1571      }
1572      sb.append(r.getRegionInfo().getEncodedName());
1573    }
1574    return sb.toString();
1575  }
1576
1577  /**
1578   * Wait on regions close.
1579   */
1580  private void waitOnAllRegionsToClose(final boolean abort) {
1581    // Wait till all regions are closed before going out.
1582    int lastCount = -1;
1583    long previousLogTime = 0;
1584    Set<String> closedRegions = new HashSet<>();
1585    boolean interrupted = false;
1586    try {
1587      while (!onlineRegions.isEmpty()) {
1588        int count = getNumberOfOnlineRegions();
1589        // Only print a message if the count of regions has changed.
1590        if (count != lastCount) {
1591          // Log every second at most
1592          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
1593            previousLogTime = EnvironmentEdgeManager.currentTime();
1594            lastCount = count;
1595            LOG.info("Waiting on " + count + " regions to close");
1596            // Only print out regions still closing if a small number else will
1597            // swamp the log.
1598            if (count < 10 && LOG.isDebugEnabled()) {
1599              LOG.debug("Online Regions=" + this.onlineRegions);
1600            }
1601          }
1602        }
1603        // Ensure all user regions have been sent a close. Use this to
1604        // protect against the case where an open comes in after we start the
1605        // iterator of onlineRegions to close all user regions.
1606        for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1607          RegionInfo hri = e.getValue().getRegionInfo();
1608          if (
1609            !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1610              && !closedRegions.contains(hri.getEncodedName())
1611          ) {
1612            closedRegions.add(hri.getEncodedName());
1613            // Don't update zk with this close transition; pass false.
1614            closeRegionIgnoreErrors(hri, abort);
1615          }
1616        }
1617        // No regions in RIT, we could stop waiting now.
1618        if (this.regionsInTransitionInRS.isEmpty()) {
1619          if (!onlineRegions.isEmpty()) {
1620            LOG.info("We were exiting though online regions are not empty,"
1621              + " because some regions failed closing");
1622          }
1623          break;
1624        } else {
1625          LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream()
1626            .map(e -> Bytes.toString(e)).collect(Collectors.joining(", ")));
1627        }
1628        if (sleepInterrupted(200)) {
1629          interrupted = true;
1630        }
1631      }
1632    } finally {
1633      if (interrupted) {
1634        Thread.currentThread().interrupt();
1635      }
1636    }
1637  }
1638
1639  private static boolean sleepInterrupted(long millis) {
1640    boolean interrupted = false;
1641    try {
1642      Thread.sleep(millis);
1643    } catch (InterruptedException e) {
1644      LOG.warn("Interrupted while sleeping");
1645      interrupted = true;
1646    }
1647    return interrupted;
1648  }
1649
1650  private void shutdownWAL(final boolean close) {
1651    if (this.walFactory != null) {
1652      try {
1653        if (close) {
1654          walFactory.close();
1655        } else {
1656          walFactory.shutdown();
1657        }
1658      } catch (Throwable e) {
1659        e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1660        LOG.error("Shutdown / close of WAL failed: " + e);
1661        LOG.debug("Shutdown / close exception details:", e);
1662      }
1663    }
1664  }
1665
1666  /**
1667   * get NamedQueue Provider to add different logs to ringbuffer
1668   */
1669  public NamedQueueRecorder getNamedQueueRecorder() {
1670    return this.namedQueueRecorder;
1671  }
1672
1673  /*
1674   * Run init. Sets up wal and starts up all server threads.
1675   * @param c Extra configuration.
1676   */
1677  protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1678    throws IOException {
1679    try {
1680      boolean updateRootDir = false;
1681      for (NameStringPair e : c.getMapEntriesList()) {
1682        String key = e.getName();
1683        // The hostname the master sees us as.
1684        if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1685          String hostnameFromMasterPOV = e.getValue();
1686          this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1687            rpcServices.getSocketAddress().getPort(), this.startcode);
1688          String expectedHostName = rpcServices.getSocketAddress().getHostName();
1689          // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it
1690          // is set to disable. so we will use the ip of the RegionServer to compare with the
1691          // hostname passed by the Master, see HBASE-27304 for details.
1692          if (
1693            StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent()
1694              && InetAddresses.isInetAddress(getActiveMaster().get().getHostname())
1695          ) {
1696            expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress();
1697          }
1698          boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead)
1699            ? hostnameFromMasterPOV.equals(expectedHostName)
1700            : hostnameFromMasterPOV.equals(useThisHostnameInstead);
1701          if (!isHostnameConsist) {
1702            String msg = "Master passed us a different hostname to use; was="
1703              + (StringUtils.isBlank(useThisHostnameInstead)
1704                ? rpcServices.getSocketAddress().getHostName()
1705                : this.useThisHostnameInstead)
1706              + ", but now=" + hostnameFromMasterPOV;
1707            LOG.error(msg);
1708            throw new IOException(msg);
1709          }
1710          continue;
1711        }
1712
1713        String value = e.getValue();
1714        if (key.equals(HConstants.HBASE_DIR)) {
1715          if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1716            updateRootDir = true;
1717          }
1718        }
1719
1720        if (LOG.isDebugEnabled()) {
1721          LOG.debug("Config from master: " + key + "=" + value);
1722        }
1723        this.conf.set(key, value);
1724      }
1725      // Set our ephemeral znode up in zookeeper now we have a name.
1726      createMyEphemeralNode();
1727
1728      if (updateRootDir) {
1729        // initialize file system by the config fs.defaultFS and hbase.rootdir from master
1730        initializeFileSystem();
1731      }
1732
1733      // hack! Maps DFSClient => RegionServer for logs. HDFS made this
1734      // config param for task trackers, but we can piggyback off of it.
1735      if (this.conf.get("mapreduce.task.attempt.id") == null) {
1736        this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString());
1737      }
1738
1739      // Save it in a file, this will allow to see if we crash
1740      ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1741
1742      // This call sets up an initialized replication and WAL. Later we start it up.
1743      setupWALAndReplication();
1744      // Init in here rather than in constructor after thread name has been set
1745      final MetricsTable metricsTable =
1746        new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1747      this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this);
1748      this.metricsRegionServer =
1749        new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable);
1750      // Now that we have a metrics source, start the pause monitor
1751      this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1752      pauseMonitor.start();
1753
1754      // There is a rare case where we do NOT want services to start. Check config.
1755      if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) {
1756        startServices();
1757      }
1758      // In here we start up the replication Service. Above we initialized it. TODO. Reconcile.
1759      // or make sense of it.
1760      startReplicationService();
1761
1762      // Set up ZK
1763      LOG.info(
1764        "Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x"
1765          + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1766
1767      // Wake up anyone waiting for this server to online
1768      synchronized (online) {
1769        online.set(true);
1770        online.notifyAll();
1771      }
1772    } catch (Throwable e) {
1773      stop("Failed initialization");
1774      throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed");
1775    } finally {
1776      sleeper.skipSleepCycle();
1777    }
1778  }
1779
1780  protected void initializeMemStoreChunkCreator() {
1781    if (MemStoreLAB.isEnabled(conf)) {
1782      // MSLAB is enabled. So initialize MemStoreChunkPool
1783      // By this time, the MemstoreFlusher is already initialized. We can get the global limits from
1784      // it.
1785      Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
1786      long globalMemStoreSize = pair.getFirst();
1787      boolean offheap = this.regionServerAccounting.isOffheap();
1788      // When off heap memstore in use, take full area for chunk pool.
1789      float poolSizePercentage = offheap
1790        ? 1.0F
1791        : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
1792      float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY,
1793        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
1794      int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
1795      float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY,
1796        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1797      // init the chunkCreator
1798      ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
1799        initialCountPercentage, this.hMemManager, indexChunkSizePercent);
1800    }
1801  }
1802
1803  private void startHeapMemoryManager() {
1804    if (this.blockCache != null) {
1805      this.hMemManager =
1806        new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
1807      this.hMemManager.start(getChoreService());
1808    }
1809  }
1810
1811  private void createMyEphemeralNode() throws KeeperException {
1812    RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1813    rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1814    rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1815    byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1816    ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data);
1817  }
1818
1819  private void deleteMyEphemeralNode() throws KeeperException {
1820    ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1821  }
1822
1823  @Override
1824  public RegionServerAccounting getRegionServerAccounting() {
1825    return regionServerAccounting;
1826  }
1827
1828  // Round the size with KB or MB.
1829  // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1
1830  // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See
1831  // HBASE-26340 for more details on why this is important.
1832  private static int roundSize(long sizeInByte, int sizeUnit) {
1833    if (sizeInByte == 0) {
1834      return 0;
1835    } else if (sizeInByte < sizeUnit) {
1836      return 1;
1837    } else {
1838      return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE);
1839    }
1840  }
1841
1842  private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) {
1843    if (blockCache instanceof CombinedBlockCache) {
1844      BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache();
1845      if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) {
1846        computation.accept((BucketCache) l2);
1847      }
1848    }
1849  }
1850
1851  /**
1852   * @param r               Region to get RegionLoad for.
1853   * @param regionLoadBldr  the RegionLoad.Builder, can be null
1854   * @param regionSpecifier the RegionSpecifier.Builder, can be null
1855   * @return RegionLoad instance.
1856   */
1857  RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr,
1858    RegionSpecifier.Builder regionSpecifier) throws IOException {
1859    byte[] name = r.getRegionInfo().getRegionName();
1860    String regionEncodedName = r.getRegionInfo().getEncodedName();
1861    int stores = 0;
1862    int storefiles = 0;
1863    int storeRefCount = 0;
1864    int maxCompactedStoreFileRefCount = 0;
1865    long storeUncompressedSize = 0L;
1866    long storefileSize = 0L;
1867    long storefileIndexSize = 0L;
1868    long rootLevelIndexSize = 0L;
1869    long totalStaticIndexSize = 0L;
1870    long totalStaticBloomSize = 0L;
1871    long totalCompactingKVs = 0L;
1872    long currentCompactedKVs = 0L;
1873    long totalRegionSize = 0L;
1874    List<HStore> storeList = r.getStores();
1875    stores += storeList.size();
1876    for (HStore store : storeList) {
1877      storefiles += store.getStorefilesCount();
1878      int currentStoreRefCount = store.getStoreRefCount();
1879      storeRefCount += currentStoreRefCount;
1880      int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount();
1881      maxCompactedStoreFileRefCount =
1882        Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount);
1883      storeUncompressedSize += store.getStoreSizeUncompressed();
1884      storefileSize += store.getStorefilesSize();
1885      totalRegionSize += store.getHFilesSize();
1886      // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB?
1887      storefileIndexSize += store.getStorefilesRootLevelIndexSize();
1888      CompactionProgress progress = store.getCompactionProgress();
1889      if (progress != null) {
1890        totalCompactingKVs += progress.getTotalCompactingKVs();
1891        currentCompactedKVs += progress.currentCompactedKVs;
1892      }
1893      rootLevelIndexSize += store.getStorefilesRootLevelIndexSize();
1894      totalStaticIndexSize += store.getTotalStaticIndexSize();
1895      totalStaticBloomSize += store.getTotalStaticBloomSize();
1896    }
1897
1898    int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB);
1899    int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB);
1900    int storefileSizeMB = roundSize(storefileSize, unitMB);
1901    int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB);
1902    int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB);
1903    int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB);
1904    int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB);
1905    int regionSizeMB = roundSize(totalRegionSize, unitMB);
1906    final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f);
1907    computeIfPersistentBucketCache(bc -> {
1908      if (bc.getRegionCachedInfo().containsKey(regionEncodedName)) {
1909        currentRegionCachedRatio.setValue(regionSizeMB == 0
1910          ? 0.0f
1911          : (float) roundSize(bc.getRegionCachedInfo().get(regionEncodedName), unitMB)
1912            / regionSizeMB);
1913      }
1914    });
1915
1916    HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution();
1917    float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname());
1918    float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname());
1919    long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight();
1920    long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname());
1921    long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname());
1922    if (regionLoadBldr == null) {
1923      regionLoadBldr = RegionLoad.newBuilder();
1924    }
1925    if (regionSpecifier == null) {
1926      regionSpecifier = RegionSpecifier.newBuilder();
1927    }
1928
1929    regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1930    regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name));
1931    regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores)
1932      .setStorefiles(storefiles).setStoreRefCount(storeRefCount)
1933      .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount)
1934      .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB)
1935      .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB)
1936      .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1937      .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1938      .setReadRequestsCount(r.getReadRequestsCount())
1939      .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount())
1940      .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs)
1941      .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality)
1942      .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight)
1943      .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight)
1944      .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState()))
1945      .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB)
1946      .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue());
1947    r.setCompleteSequenceId(regionLoadBldr);
1948    return regionLoadBldr.build();
1949  }
1950
1951  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
1952    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
1953    userLoadBldr.setUserName(user);
1954    userSource.getClientMetrics().values().stream()
1955      .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
1956        .setHostName(clientMetrics.getHostName())
1957        .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
1958        .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
1959        .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
1960      .forEach(userLoadBldr::addClientMetrics);
1961    return userLoadBldr.build();
1962  }
1963
1964  public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1965    HRegion r = onlineRegions.get(encodedRegionName);
1966    return r != null ? createRegionLoad(r, null, null) : null;
1967  }
1968
1969  /**
1970   * Inner class that runs on a long period checking if regions need compaction.
1971   */
1972  private static class CompactionChecker extends ScheduledChore {
1973    private final HRegionServer instance;
1974    private final int majorCompactPriority;
1975    private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1976    // Iteration is 1-based rather than 0-based so we don't check for compaction
1977    // immediately upon region server startup
1978    private long iteration = 1;
1979
1980    CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) {
1981      super("CompactionChecker", stopper, sleepTime);
1982      this.instance = h;
1983      LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime));
1984
1985      /*
1986       * MajorCompactPriority is configurable. If not set, the compaction will use default priority.
1987       */
1988      this.majorCompactPriority = this.instance.conf
1989        .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY);
1990    }
1991
1992    @Override
1993    protected void chore() {
1994      for (HRegion hr : this.instance.onlineRegions.values()) {
1995        // If region is read only or compaction is disabled at table level, there's no need to
1996        // iterate through region's stores
1997        if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {
1998          continue;
1999        }
2000        for (HStore s : hr.stores.values()) {
2001          try {
2002            long multiplier = s.getCompactionCheckMultiplier();
2003            assert multiplier > 0;
2004            if (iteration % multiplier != 0) {
2005              continue;
2006            }
2007            if (s.needsCompaction()) {
2008              // Queue a compaction. Will recognize if major is needed.
2009              this.instance.compactSplitThread.requestSystemCompaction(hr, s,
2010                getName() + " requests compaction");
2011            } else if (s.shouldPerformMajorCompaction()) {
2012              s.triggerMajorCompaction();
2013              if (
2014                majorCompactPriority == DEFAULT_PRIORITY
2015                  || majorCompactPriority > hr.getCompactPriority()
2016              ) {
2017                this.instance.compactSplitThread.requestCompaction(hr, s,
2018                  getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
2019                  CompactionLifeCycleTracker.DUMMY, null);
2020              } else {
2021                this.instance.compactSplitThread.requestCompaction(hr, s,
2022                  getName() + " requests major compaction; use configured priority",
2023                  this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
2024              }
2025            }
2026          } catch (IOException e) {
2027            LOG.warn("Failed major compaction check on " + hr, e);
2028          }
2029        }
2030      }
2031      iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
2032    }
2033  }
2034
2035  private static class PeriodicMemStoreFlusher extends ScheduledChore {
2036    private final HRegionServer server;
2037    private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds
2038    private final static int MIN_DELAY_TIME = 0; // millisec
2039    private final long rangeOfDelayMs;
2040
2041    PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) {
2042      super("MemstoreFlusherChore", server, cacheFlushInterval);
2043      this.server = server;
2044
2045      final long configuredRangeOfDelay = server.getConfiguration()
2046        .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY);
2047      this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay);
2048    }
2049
2050    @Override
2051    protected void chore() {
2052      final StringBuilder whyFlush = new StringBuilder();
2053      for (HRegion r : this.server.onlineRegions.values()) {
2054        if (r == null) {
2055          continue;
2056        }
2057        if (r.shouldFlush(whyFlush)) {
2058          FlushRequester requester = server.getFlushRequester();
2059          if (requester != null) {
2060            long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME;
2061            // Throttle the flushes by putting a delay. If we don't throttle, and there
2062            // is a balanced write-load on the regions in a table, we might end up
2063            // overwhelming the filesystem with too many flushes at once.
2064            if (requester.requestDelayedFlush(r, delay)) {
2065              LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(),
2066                r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay);
2067            }
2068          }
2069        }
2070      }
2071    }
2072  }
2073
2074  /**
2075   * Report the status of the server. A server is online once all the startup is completed (setting
2076   * up filesystem, starting executorService threads, etc.). This method is designed mostly to be
2077   * useful in tests.
2078   * @return true if online, false if not.
2079   */
2080  public boolean isOnline() {
2081    return online.get();
2082  }
2083
2084  /**
2085   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
2086   * be hooked up to WAL.
2087   */
2088  private void setupWALAndReplication() throws IOException {
2089    WALFactory factory = new WALFactory(conf, serverName.toString(), (Server) this);
2090    // TODO Replication make assumptions here based on the default filesystem impl
2091    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
2092    String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
2093
2094    Path logDir = new Path(walRootDir, logName);
2095    LOG.debug("logDir={}", logDir);
2096    if (this.walFs.exists(logDir)) {
2097      throw new RegionServerRunningException(
2098        "Region server has already created directory at " + this.serverName.toString());
2099    }
2100    // Always create wal directory as now we need this when master restarts to find out the live
2101    // region servers.
2102    if (!this.walFs.mkdirs(logDir)) {
2103      throw new IOException("Can not create wal directory " + logDir);
2104    }
2105    // Instantiate replication if replication enabled. Pass it the log directories.
2106    createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
2107
2108    WALActionsListener walEventListener = getWALEventTrackerListener(conf);
2109    if (walEventListener != null && factory.getWALProvider() != null) {
2110      factory.getWALProvider().addWALActionsListener(walEventListener);
2111    }
2112    this.walFactory = factory;
2113  }
2114
2115  private WALActionsListener getWALEventTrackerListener(Configuration conf) {
2116    if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
2117      WALEventTrackerListener listener =
2118        new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
2119      return listener;
2120    }
2121    return null;
2122  }
2123
2124  /**
2125   * Start up replication source and sink handlers.
2126   */
2127  private void startReplicationService() throws IOException {
2128    if (
2129      this.replicationSourceHandler == this.replicationSinkHandler
2130        && this.replicationSourceHandler != null
2131    ) {
2132      this.replicationSourceHandler.startReplicationService();
2133    } else {
2134      if (this.replicationSourceHandler != null) {
2135        this.replicationSourceHandler.startReplicationService();
2136      }
2137      if (this.replicationSinkHandler != null) {
2138        this.replicationSinkHandler.startReplicationService();
2139      }
2140    }
2141  }
2142
2143  /** Returns Master address tracker instance. */
2144  public MasterAddressTracker getMasterAddressTracker() {
2145    return this.masterAddressTracker;
2146  }
2147
2148  /**
2149   * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need
2150   * to run. This is called after we've successfully registered with the Master. Install an
2151   * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We
2152   * cannot set the handler on all threads. Server's internal Listener thread is off limits. For
2153   * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries
2154   * to run should trigger same critical condition and the shutdown will run. On its way out, this
2155   * server will shut down Server. Leases are sort of inbetween. It has an internal thread that
2156   * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped
2157   * by this hosting server. Worker logs the exception and exits.
2158   */
2159  private void startServices() throws IOException {
2160    if (!isStopped() && !isAborted()) {
2161      initializeThreads();
2162    }
2163    this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection);
2164    this.secureBulkLoadManager.start();
2165
2166    // Health checker thread.
2167    if (isHealthCheckerConfigured()) {
2168      int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
2169        HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
2170      healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
2171    }
2172
2173    this.walRoller = new LogRoller(this);
2174    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
2175    this.procedureResultReporter = new RemoteProcedureResultReporter(this);
2176
2177    // Create the CompactedFileDischarger chore executorService. This chore helps to
2178    // remove the compacted files that will no longer be used in reads.
2179    // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
2180    // 2 mins so that compacted files can be archived before the TTLCleaner runs
2181    int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
2182    this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this);
2183    choreService.scheduleChore(compactedFileDischarger);
2184
2185    // Start executor services
2186    final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
2187    executorService.startExecutorService(executorService.new ExecutorConfig()
2188      .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads));
2189    final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
2190    executorService.startExecutorService(executorService.new ExecutorConfig()
2191      .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads));
2192    final int openPriorityRegionThreads =
2193      conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
2194    executorService.startExecutorService(
2195      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION)
2196        .setCorePoolSize(openPriorityRegionThreads));
2197    final int closeRegionThreads =
2198      conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
2199    executorService.startExecutorService(executorService.new ExecutorConfig()
2200      .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads));
2201    final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
2202    executorService.startExecutorService(executorService.new ExecutorConfig()
2203      .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads));
2204    if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
2205      final int storeScannerParallelSeekThreads =
2206        conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
2207      executorService.startExecutorService(
2208        executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK)
2209          .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true));
2210    }
2211    final int logReplayOpsThreads =
2212      conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
2213    executorService.startExecutorService(
2214      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS)
2215        .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
2216    // Start the threads for compacted files discharger
2217    final int compactionDischargerThreads =
2218      conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
2219    executorService.startExecutorService(executorService.new ExecutorConfig()
2220      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)
2221      .setCorePoolSize(compactionDischargerThreads));
2222    if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
2223      final int regionReplicaFlushThreads =
2224        conf.getInt("hbase.regionserver.region.replica.flusher.threads",
2225          conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
2226      executorService.startExecutorService(executorService.new ExecutorConfig()
2227        .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS)
2228        .setCorePoolSize(regionReplicaFlushThreads));
2229    }
2230    final int refreshPeerThreads =
2231      conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
2232    executorService.startExecutorService(executorService.new ExecutorConfig()
2233      .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads));
2234
2235    final int switchRpcThrottleThreads =
2236      conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
2237    executorService.startExecutorService(
2238      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE)
2239        .setCorePoolSize(switchRpcThrottleThreads));
2240    final int claimReplicationQueueThreads =
2241      conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1);
2242    executorService.startExecutorService(
2243      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE)
2244        .setCorePoolSize(claimReplicationQueueThreads));
2245    final int rsSnapshotOperationThreads =
2246      conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3);
2247    executorService.startExecutorService(
2248      executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS)
2249        .setCorePoolSize(rsSnapshotOperationThreads));
2250    final int rsFlushOperationThreads =
2251      conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3);
2252    executorService.startExecutorService(executorService.new ExecutorConfig()
2253      .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads));
2254
2255    Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
2256      uncaughtExceptionHandler);
2257    if (this.cacheFlusher != null) {
2258      this.cacheFlusher.start(uncaughtExceptionHandler);
2259    }
2260    Threads.setDaemonThreadRunning(this.procedureResultReporter,
2261      getName() + ".procedureResultReporter", uncaughtExceptionHandler);
2262
2263    if (this.compactionChecker != null) {
2264      choreService.scheduleChore(compactionChecker);
2265    }
2266    if (this.periodicFlusher != null) {
2267      choreService.scheduleChore(periodicFlusher);
2268    }
2269    if (this.healthCheckChore != null) {
2270      choreService.scheduleChore(healthCheckChore);
2271    }
2272    if (this.nonceManagerChore != null) {
2273      choreService.scheduleChore(nonceManagerChore);
2274    }
2275    if (this.storefileRefresher != null) {
2276      choreService.scheduleChore(storefileRefresher);
2277    }
2278    if (this.fsUtilizationChore != null) {
2279      choreService.scheduleChore(fsUtilizationChore);
2280    }
2281    if (this.namedQueueServiceChore != null) {
2282      choreService.scheduleChore(namedQueueServiceChore);
2283    }
2284    if (this.brokenStoreFileCleaner != null) {
2285      choreService.scheduleChore(brokenStoreFileCleaner);
2286    }
2287    if (this.rsMobFileCleanerChore != null) {
2288      choreService.scheduleChore(rsMobFileCleanerChore);
2289    }
2290    if (replicationMarkerChore != null) {
2291      LOG.info("Starting replication marker chore");
2292      choreService.scheduleChore(replicationMarkerChore);
2293    }
2294
2295    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
2296    // an unhandled exception, it will just exit.
2297    Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
2298      uncaughtExceptionHandler);
2299
2300    // Create the log splitting worker and start it
2301    // set a smaller retries to fast fail otherwise splitlogworker could be blocked for
2302    // quite a while inside Connection layer. The worker won't be available for other
2303    // tasks even after current task is preempted after a split task times out.
2304    Configuration sinkConf = HBaseConfiguration.create(conf);
2305    sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2306      conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds
2307    sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
2308      conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
2309    sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
2310    if (
2311      this.csm != null
2312        && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
2313    ) {
2314      // SplitLogWorker needs csm. If none, don't start this.
2315      this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
2316      splitLogWorker.start();
2317      LOG.debug("SplitLogWorker started");
2318    }
2319
2320    // Memstore services.
2321    startHeapMemoryManager();
2322    // Call it after starting HeapMemoryManager.
2323    initializeMemStoreChunkCreator();
2324  }
2325
2326  private void initializeThreads() {
2327    // Cache flushing thread.
2328    this.cacheFlusher = new MemStoreFlusher(conf, this);
2329
2330    // Compaction thread
2331    this.compactSplitThread = new CompactSplit(this);
2332
2333    // Prefetch Notifier
2334    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
2335
2336    // Background thread to check for compactions; needed if region has not gotten updates
2337    // in a while. It will take care of not checking too frequently on store-by-store basis.
2338    this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this);
2339    this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
2340    this.leaseManager = new LeaseManager(this.threadWakeFrequency);
2341
2342    final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
2343      HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
2344    final boolean walEventTrackerEnabled =
2345      conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
2346
2347    if (isSlowLogTableEnabled || walEventTrackerEnabled) {
2348      // default chore duration: 10 min
2349      // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
2350      final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
2351        DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
2352
2353      final int namedQueueChoreDuration =
2354        conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
2355      // Considering min of slowLogChoreDuration and namedQueueChoreDuration
2356      int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
2357
2358      namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
2359        this.namedQueueRecorder, this.getConnection());
2360    }
2361
2362    if (this.nonceManager != null) {
2363      // Create the scheduled chore that cleans up nonces.
2364      nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
2365    }
2366
2367    // Setup the Quota Manager
2368    rsQuotaManager = new RegionServerRpcQuotaManager(this);
2369    rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
2370
2371    if (QuotaUtil.isQuotaEnabled(conf)) {
2372      this.fsUtilizationChore = new FileSystemUtilizationChore(this);
2373    }
2374
2375    boolean onlyMetaRefresh = false;
2376    int storefileRefreshPeriod =
2377      conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
2378        StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2379    if (storefileRefreshPeriod == 0) {
2380      storefileRefreshPeriod =
2381        conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
2382          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
2383      onlyMetaRefresh = true;
2384    }
2385    if (storefileRefreshPeriod > 0) {
2386      this.storefileRefresher =
2387        new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this);
2388    }
2389
2390    int brokenStoreFileCleanerPeriod =
2391      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
2392        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
2393    int brokenStoreFileCleanerDelay =
2394      conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
2395        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
2396    double brokenStoreFileCleanerDelayJitter =
2397      conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
2398        BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
2399    double jitterRate =
2400      (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
2401    long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
2402    this.brokenStoreFileCleaner =
2403      new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
2404        brokenStoreFileCleanerPeriod, this, conf, this);
2405
2406    this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
2407
2408    registerConfigurationObservers();
2409    initializeReplicationMarkerChore();
2410  }
2411
2412  private void registerConfigurationObservers() {
2413    // Register Replication if possible, as now we support recreating replication peer storage, for
2414    // migrating across different replication peer storages online
2415    if (replicationSourceHandler instanceof ConfigurationObserver) {
2416      configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler);
2417    }
2418    if (
2419      replicationSourceHandler != replicationSinkHandler
2420        && replicationSinkHandler instanceof ConfigurationObserver
2421    ) {
2422      configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler);
2423    }
2424    // Registering the compactSplitThread object with the ConfigurationManager.
2425    configurationManager.registerObserver(this.compactSplitThread);
2426    configurationManager.registerObserver(this.cacheFlusher);
2427    configurationManager.registerObserver(this.rpcServices);
2428    configurationManager.registerObserver(this.prefetchExecutorNotifier);
2429    configurationManager.registerObserver(this);
2430  }
2431
2432  /**
2433   * Puts up the webui.
2434   */
2435  private void putUpWebUI() throws IOException {
2436    int port =
2437      this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2438    String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
2439
2440    if (this instanceof HMaster) {
2441      port = conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT);
2442      addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
2443    }
2444    // -1 is for disabling info server
2445    if (port < 0) {
2446      return;
2447    }
2448
2449    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
2450      String msg = "Failed to start http info server. Address " + addr
2451        + " does not belong to this host. Correct configuration parameter: "
2452        + "hbase.regionserver.info.bindAddress";
2453      LOG.error(msg);
2454      throw new IOException(msg);
2455    }
2456    // check if auto port bind enabled
2457    boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false);
2458    while (true) {
2459      try {
2460        this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
2461        infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet());
2462        configureInfoServer();
2463        this.infoServer.start();
2464        break;
2465      } catch (BindException e) {
2466        if (!auto) {
2467          // auto bind disabled throw BindException
2468          LOG.error("Failed binding http info server to port: " + port);
2469          throw e;
2470        }
2471        // auto bind enabled, try to use another port
2472        LOG.info("Failed binding http info server to port: " + port);
2473        port++;
2474        LOG.info("Retry starting http info server with port: " + port);
2475      }
2476    }
2477    port = this.infoServer.getPort();
2478    conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
2479    int masterInfoPort =
2480      conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT);
2481    conf.setInt("hbase.master.info.port.orig", masterInfoPort);
2482    conf.setInt(HConstants.MASTER_INFO_PORT, port);
2483  }
2484
2485  /*
2486   * Verify that server is healthy
2487   */
2488  private boolean isHealthy() {
2489    if (!dataFsOk) {
2490      // File system problem
2491      return false;
2492    }
2493    // Verify that all threads are alive
2494    boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive())
2495      && (this.cacheFlusher == null || this.cacheFlusher.isAlive())
2496      && (this.walRoller == null || this.walRoller.isAlive())
2497      && (this.compactionChecker == null || this.compactionChecker.isScheduled())
2498      && (this.periodicFlusher == null || this.periodicFlusher.isScheduled());
2499    if (!healthy) {
2500      stop("One or more threads are no longer alive -- stop");
2501    }
2502    return healthy;
2503  }
2504
2505  @Override
2506  public List<WAL> getWALs() {
2507    return walFactory.getWALs();
2508  }
2509
2510  @Override
2511  public WAL getWAL(RegionInfo regionInfo) throws IOException {
2512    try {
2513      WAL wal = walFactory.getWAL(regionInfo);
2514      if (this.walRoller != null) {
2515        this.walRoller.addWAL(wal);
2516      }
2517      return wal;
2518    } catch (FailedCloseWALAfterInitializedErrorException ex) {
2519      // see HBASE-21751 for details
2520      abort("WAL can not clean up after init failed", ex);
2521      throw ex;
2522    }
2523  }
2524
2525  public LogRoller getWalRoller() {
2526    return walRoller;
2527  }
2528
2529  WALFactory getWalFactory() {
2530    return walFactory;
2531  }
2532
2533  @Override
2534  public Connection getConnection() {
2535    return getClusterConnection();
2536  }
2537
2538  @Override
2539  public ClusterConnection getClusterConnection() {
2540    return this.clusterConnection;
2541  }
2542
2543  @Override
2544  public void stop(final String msg) {
2545    stop(msg, false, RpcServer.getRequestUser().orElse(null));
2546  }
2547
2548  /**
2549   * Stops the regionserver.
2550   * @param msg   Status message
2551   * @param force True if this is a regionserver abort
2552   * @param user  The user executing the stop request, or null if no user is associated
2553   */
2554  public void stop(final String msg, final boolean force, final User user) {
2555    if (!this.stopped) {
2556      LOG.info("***** STOPPING region server '" + this + "' *****");
2557      if (this.rsHost != null) {
2558        // when forced via abort don't allow CPs to override
2559        try {
2560          this.rsHost.preStop(msg, user);
2561        } catch (IOException ioe) {
2562          if (!force) {
2563            LOG.warn("The region server did not stop", ioe);
2564            return;
2565          }
2566          LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2567        }
2568      }
2569      this.stopped = true;
2570      LOG.info("STOPPED: " + msg);
2571      // Wakes run() if it is sleeping
2572      sleeper.skipSleepCycle();
2573    }
2574  }
2575
2576  public void waitForServerOnline() {
2577    while (!isStopped() && !isOnline()) {
2578      synchronized (online) {
2579        try {
2580          online.wait(msgInterval);
2581        } catch (InterruptedException ie) {
2582          Thread.currentThread().interrupt();
2583          break;
2584        }
2585      }
2586    }
2587  }
2588
2589  @Override
2590  public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
2591    HRegion r = context.getRegion();
2592    long openProcId = context.getOpenProcId();
2593    long masterSystemTime = context.getMasterSystemTime();
2594    rpcServices.checkOpen();
2595    LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
2596      r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
2597    // Do checks to see if we need to compact (references or too many files)
2598    for (HStore s : r.stores.values()) {
2599      if (s.hasReferences() || s.needsCompaction()) {
2600        this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2601      }
2602    }
2603    long openSeqNum = r.getOpenSeqNum();
2604    if (openSeqNum == HConstants.NO_SEQNUM) {
2605      // If we opened a region, we should have read some sequence number from it.
2606      LOG.error(
2607        "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
2608      openSeqNum = 0;
2609    }
2610
2611    // Notify master
2612    if (
2613      !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
2614        openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))
2615    ) {
2616      throw new IOException(
2617        "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
2618    }
2619
2620    triggerFlushInPrimaryRegion(r);
2621
2622    LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2623  }
2624
2625  /**
2626   * Helper method for use in tests. Skip the region transition report when there's no master around
2627   * to receive it.
2628   */
2629  private boolean skipReportingTransition(final RegionStateTransitionContext context) {
2630    final TransitionCode code = context.getCode();
2631    final long openSeqNum = context.getOpenSeqNum();
2632    long masterSystemTime = context.getMasterSystemTime();
2633    final RegionInfo[] hris = context.getHris();
2634
2635    if (code == TransitionCode.OPENED) {
2636      Preconditions.checkArgument(hris != null && hris.length == 1);
2637      if (hris[0].isMetaRegion()) {
2638        LOG.warn(
2639          "meta table location is stored in master local store, so we can not skip reporting");
2640        return false;
2641      } else {
2642        try {
2643          MetaTableAccessor.updateRegionLocation(clusterConnection, hris[0], serverName, openSeqNum,
2644            masterSystemTime);
2645        } catch (IOException e) {
2646          LOG.info("Failed to update meta", e);
2647          return false;
2648        }
2649      }
2650    }
2651    return true;
2652  }
2653
2654  private ReportRegionStateTransitionRequest
2655    createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) {
2656    final TransitionCode code = context.getCode();
2657    final long openSeqNum = context.getOpenSeqNum();
2658    final RegionInfo[] hris = context.getHris();
2659    final long[] procIds = context.getProcIds();
2660
2661    ReportRegionStateTransitionRequest.Builder builder =
2662      ReportRegionStateTransitionRequest.newBuilder();
2663    builder.setServer(ProtobufUtil.toServerName(serverName));
2664    RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2665    transition.setTransitionCode(code);
2666    if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2667      transition.setOpenSeqNum(openSeqNum);
2668    }
2669    for (RegionInfo hri : hris) {
2670      transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri));
2671    }
2672    for (long procId : procIds) {
2673      transition.addProcId(procId);
2674    }
2675
2676    return builder.build();
2677  }
2678
2679  @Override
2680  public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2681    if (TEST_SKIP_REPORTING_TRANSITION) {
2682      return skipReportingTransition(context);
2683    }
2684    final ReportRegionStateTransitionRequest request =
2685      createReportRegionStateTransitionRequest(context);
2686
2687    int tries = 0;
2688    long pauseTime = this.retryPauseTime;
2689    // Keep looping till we get an error. We want to send reports even though server is going down.
2690    // Only go down if clusterConnection is null. It is set to null almost as last thing as the
2691    // HRegionServer does down.
2692    while (this.clusterConnection != null && !this.clusterConnection.isClosed()) {
2693      RegionServerStatusService.BlockingInterface rss = rssStub;
2694      try {
2695        if (rss == null) {
2696          createRegionServerStatusStub();
2697          continue;
2698        }
2699        ReportRegionStateTransitionResponse response =
2700          rss.reportRegionStateTransition(null, request);
2701        if (response.hasErrorMessage()) {
2702          LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage());
2703          break;
2704        }
2705        // Log if we had to retry else don't log unless TRACE. We want to
2706        // know if were successful after an attempt showed in logs as failed.
2707        if (tries > 0 || LOG.isTraceEnabled()) {
2708          LOG.info("TRANSITION REPORTED " + request);
2709        }
2710        // NOTE: Return mid-method!!!
2711        return true;
2712      } catch (ServiceException se) {
2713        IOException ioe = ProtobufUtil.getRemoteException(se);
2714        boolean pause = ioe instanceof ServerNotRunningYetException
2715          || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException;
2716        if (pause) {
2717          // Do backoff else we flood the Master with requests.
2718          pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries);
2719        } else {
2720          pauseTime = this.retryPauseTime; // Reset.
2721        }
2722        LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#"
2723          + tries + ")"
2724          + (pause
2725            ? " after " + pauseTime + "ms delay (Master is coming online...)."
2726            : " immediately."),
2727          ioe);
2728        if (pause) {
2729          Threads.sleep(pauseTime);
2730        }
2731        tries++;
2732        if (rssStub == rss) {
2733          rssStub = null;
2734        }
2735      }
2736    }
2737    return false;
2738  }
2739
2740  /**
2741   * Trigger a flush in the primary region replica if this region is a secondary replica. Does not
2742   * block this thread. See RegionReplicaFlushHandler for details.
2743   */
2744  private void triggerFlushInPrimaryRegion(final HRegion region) {
2745    if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2746      return;
2747    }
2748    TableName tn = region.getTableDescriptor().getTableName();
2749    if (
2750      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn)
2751        || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) ||
2752        // If the memstore replication not setup, we do not have to wait for observing a flush event
2753        // from primary before starting to serve reads, because gaps from replication is not
2754        // applicable,this logic is from
2755        // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by
2756        // HBASE-13063
2757        !region.getTableDescriptor().hasRegionMemStoreReplication()
2758    ) {
2759      region.setReadsEnabled(true);
2760      return;
2761    }
2762
2763    region.setReadsEnabled(false); // disable reads before marking the region as opened.
2764    // RegionReplicaFlushHandler might reset this.
2765
2766    // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
2767    if (this.executorService != null) {
2768      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
2769        rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2770    } else {
2771      LOG.info("Executor is null; not running flush of primary region replica for {}",
2772        region.getRegionInfo());
2773    }
2774  }
2775
2776  @Override
2777  public RpcServerInterface getRpcServer() {
2778    return rpcServices.rpcServer;
2779  }
2780
2781  @InterfaceAudience.Private
2782  public RSRpcServices getRSRpcServices() {
2783    return rpcServices;
2784  }
2785
2786  /**
2787   * Cause the server to exit without closing the regions it is serving, the log it is using and
2788   * without notifying the master. Used unit testing and on catastrophic events such as HDFS is
2789   * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused
2790   * the abort, or null
2791   */
2792  @Override
2793  public void abort(String reason, Throwable cause) {
2794    if (!setAbortRequested()) {
2795      // Abort already in progress, ignore the new request.
2796      LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason);
2797      return;
2798    }
2799    String msg = "***** ABORTING region server " + this + ": " + reason + " *****";
2800    if (cause != null) {
2801      LOG.error(HBaseMarkers.FATAL, msg, cause);
2802    } else {
2803      LOG.error(HBaseMarkers.FATAL, msg);
2804    }
2805    // HBASE-4014: show list of coprocessors that were loaded to help debug
2806    // regionserver crashes.Note that we're implicitly using
2807    // java.util.HashSet's toString() method to print the coprocessor names.
2808    LOG.error(HBaseMarkers.FATAL,
2809      "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors());
2810    // Try and dump metrics if abort -- might give clue as to how fatal came about....
2811    try {
2812      LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics());
2813    } catch (MalformedObjectNameException | IOException e) {
2814      LOG.warn("Failed dumping metrics", e);
2815    }
2816
2817    // Do our best to report our abort to the master, but this may not work
2818    try {
2819      if (cause != null) {
2820        msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
2821      }
2822      // Report to the master but only if we have already registered with the master.
2823      RegionServerStatusService.BlockingInterface rss = rssStub;
2824      if (rss != null && this.serverName != null) {
2825        ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder();
2826        builder.setServer(ProtobufUtil.toServerName(this.serverName));
2827        builder.setErrorMessage(msg);
2828        rss.reportRSFatalError(null, builder.build());
2829      }
2830    } catch (Throwable t) {
2831      LOG.warn("Unable to report fatal error to master", t);
2832    }
2833
2834    scheduleAbortTimer();
2835    // shutdown should be run as the internal user
2836    stop(reason, true, null);
2837  }
2838
2839  /**
2840   * Sets the abort state if not already set.
2841   * @return True if abortRequested set to True successfully, false if an abort is already in
2842   *         progress.
2843   */
2844  protected boolean setAbortRequested() {
2845    return abortRequested.compareAndSet(false, true);
2846  }
2847
2848  @Override
2849  public boolean isAborted() {
2850    return abortRequested.get();
2851  }
2852
2853  /*
2854   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does
2855   * close socket in case want to bring up server on old hostname+port immediately.
2856   */
2857  @InterfaceAudience.Private
2858  protected void kill() {
2859    this.killed = true;
2860    abort("Simulated kill");
2861  }
2862
2863  // Limits the time spent in the shutdown process.
2864  private void scheduleAbortTimer() {
2865    if (this.abortMonitor == null) {
2866      this.abortMonitor = new Timer("Abort regionserver monitor", true);
2867      TimerTask abortTimeoutTask = null;
2868      try {
2869        Constructor<? extends TimerTask> timerTaskCtor =
2870          Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName()))
2871            .asSubclass(TimerTask.class).getDeclaredConstructor();
2872        timerTaskCtor.setAccessible(true);
2873        abortTimeoutTask = timerTaskCtor.newInstance();
2874      } catch (Exception e) {
2875        LOG.warn("Initialize abort timeout task failed", e);
2876      }
2877      if (abortTimeoutTask != null) {
2878        abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT));
2879      }
2880    }
2881  }
2882
2883  protected final void shutdownChore(ScheduledChore chore) {
2884    if (chore != null) {
2885      chore.shutdown();
2886    }
2887  }
2888
2889  /**
2890   * Wait on all threads to finish. Presumption is that all closes and stops have already been
2891   * called.
2892   */
2893  protected void stopServiceThreads() {
2894    // clean up the scheduled chores
2895    if (this.choreService != null) {
2896      shutdownChore(nonceManagerChore);
2897      shutdownChore(compactionChecker);
2898      shutdownChore(compactedFileDischarger);
2899      shutdownChore(periodicFlusher);
2900      shutdownChore(healthCheckChore);
2901      shutdownChore(storefileRefresher);
2902      shutdownChore(fsUtilizationChore);
2903      shutdownChore(namedQueueServiceChore);
2904      shutdownChore(replicationMarkerChore);
2905      shutdownChore(rsMobFileCleanerChore);
2906      // cancel the remaining scheduled chores (in case we missed out any)
2907      // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any
2908      choreService.shutdown();
2909    }
2910    if (bootstrapNodeManager != null) {
2911      bootstrapNodeManager.stop();
2912    }
2913    if (this.cacheFlusher != null) {
2914      this.cacheFlusher.shutdown();
2915    }
2916    if (this.walRoller != null) {
2917      this.walRoller.close();
2918    }
2919    if (this.compactSplitThread != null) {
2920      this.compactSplitThread.join();
2921    }
2922    if (this.executorService != null) {
2923      this.executorService.shutdown();
2924    }
2925    if (
2926      this.replicationSourceHandler != null
2927        && this.replicationSourceHandler == this.replicationSinkHandler
2928    ) {
2929      this.replicationSourceHandler.stopReplicationService();
2930    } else {
2931      if (this.replicationSourceHandler != null) {
2932        this.replicationSourceHandler.stopReplicationService();
2933      }
2934      if (this.replicationSinkHandler != null) {
2935        this.replicationSinkHandler.stopReplicationService();
2936      }
2937    }
2938  }
2939
2940  /** Returns Return the object that implements the replication source executorService. */
2941  @InterfaceAudience.Private
2942  public ReplicationSourceService getReplicationSourceService() {
2943    return replicationSourceHandler;
2944  }
2945
2946  /** Returns Return the object that implements the replication sink executorService. */
2947  ReplicationSinkService getReplicationSinkService() {
2948    return replicationSinkHandler;
2949  }
2950
2951  /**
2952   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2953   * connection, the current rssStub must be null. Method will block until a master is available.
2954   * You can break from this block by requesting the server stop.
2955   * @return master + port, or null if server has been stopped
2956   */
2957  private synchronized ServerName createRegionServerStatusStub() {
2958    // Create RS stub without refreshing the master node from ZK, use cached data
2959    return createRegionServerStatusStub(false);
2960  }
2961
2962  /**
2963   * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh
2964   * connection, the current rssStub must be null. Method will block until a master is available.
2965   * You can break from this block by requesting the server stop.
2966   * @param refresh If true then master address will be read from ZK, otherwise use cached data
2967   * @return master + port, or null if server has been stopped
2968   */
2969  @InterfaceAudience.Private
2970  protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
2971    if (rssStub != null) {
2972      return masterAddressTracker.getMasterAddress();
2973    }
2974    ServerName sn = null;
2975    long previousLogTime = 0;
2976    RegionServerStatusService.BlockingInterface intRssStub = null;
2977    LockService.BlockingInterface intLockStub = null;
2978    boolean interrupted = false;
2979    try {
2980      while (keepLooping()) {
2981        sn = this.masterAddressTracker.getMasterAddress(refresh);
2982        if (sn == null) {
2983          if (!keepLooping()) {
2984            // give up with no connection.
2985            LOG.debug("No master found and cluster is stopped; bailing out");
2986            return null;
2987          }
2988          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
2989            LOG.debug("No master found; retry");
2990            previousLogTime = EnvironmentEdgeManager.currentTime();
2991          }
2992          refresh = true; // let's try pull it from ZK directly
2993          if (sleepInterrupted(200)) {
2994            interrupted = true;
2995          }
2996          continue;
2997        }
2998
2999        // If we are on the active master, use the shortcut
3000        if (this instanceof HMaster && sn.equals(getServerName())) {
3001          intRssStub = ((HMaster) this).getMasterRpcServices();
3002          intLockStub = ((HMaster) this).getMasterRpcServices();
3003          break;
3004        }
3005        try {
3006          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
3007            userProvider.getCurrent(), shortOperationTimeout);
3008          intRssStub = RegionServerStatusService.newBlockingStub(channel);
3009          intLockStub = LockService.newBlockingStub(channel);
3010          break;
3011        } catch (IOException e) {
3012          if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
3013            e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
3014            if (e instanceof ServerNotRunningYetException) {
3015              LOG.info("Master isn't available yet, retrying");
3016            } else {
3017              LOG.warn("Unable to connect to master. Retrying. Error was:", e);
3018            }
3019            previousLogTime = EnvironmentEdgeManager.currentTime();
3020          }
3021          if (sleepInterrupted(200)) {
3022            interrupted = true;
3023          }
3024        }
3025      }
3026    } finally {
3027      if (interrupted) {
3028        Thread.currentThread().interrupt();
3029      }
3030    }
3031    this.rssStub = intRssStub;
3032    this.lockStub = intLockStub;
3033    return sn;
3034  }
3035
3036  /**
3037   * @return True if we should break loop because cluster is going down or this server has been
3038   *         stopped or hdfs has gone bad.
3039   */
3040  private boolean keepLooping() {
3041    return !this.stopped && isClusterUp();
3042  }
3043
3044  /*
3045   * Let the master know we're here Run initialization using parameters passed us by the master.
3046   * @return A Map of key/value configurations we got from the Master else null if we failed to
3047   * register.
3048   */
3049  private RegionServerStartupResponse reportForDuty() throws IOException {
3050    if (this.masterless) {
3051      return RegionServerStartupResponse.getDefaultInstance();
3052    }
3053    ServerName masterServerName = createRegionServerStatusStub(true);
3054    RegionServerStatusService.BlockingInterface rss = rssStub;
3055    if (masterServerName == null || rss == null) {
3056      return null;
3057    }
3058    RegionServerStartupResponse result = null;
3059    try {
3060      rpcServices.requestCount.reset();
3061      rpcServices.rpcGetRequestCount.reset();
3062      rpcServices.rpcScanRequestCount.reset();
3063      rpcServices.rpcFullScanRequestCount.reset();
3064      rpcServices.rpcMultiRequestCount.reset();
3065      rpcServices.rpcMutateRequestCount.reset();
3066      LOG.info("reportForDuty to master=" + masterServerName + " with isa=" + rpcServices.isa
3067        + ", startcode=" + this.startcode);
3068      long now = EnvironmentEdgeManager.currentTime();
3069      int port = rpcServices.isa.getPort();
3070      RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
3071      if (!StringUtils.isBlank(useThisHostnameInstead)) {
3072        request.setUseThisHostnameInstead(useThisHostnameInstead);
3073      }
3074      request.setPort(port);
3075      request.setServerStartCode(this.startcode);
3076      request.setServerCurrentTime(now);
3077      result = rss.regionServerStartup(null, request.build());
3078    } catch (ServiceException se) {
3079      IOException ioe = ProtobufUtil.getRemoteException(se);
3080      if (ioe instanceof ClockOutOfSyncException) {
3081        LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
3082        // Re-throw IOE will cause RS to abort
3083        throw ioe;
3084      } else if (ioe instanceof DecommissionedHostRejectedException) {
3085        LOG.error(HBaseMarkers.FATAL,
3086          "Master rejected startup because the host is considered decommissioned", ioe);
3087        // Re-throw IOE will cause RS to abort
3088        throw ioe;
3089      } else if (ioe instanceof ServerNotRunningYetException) {
3090        LOG.debug("Master is not running yet");
3091      } else {
3092        LOG.warn("error telling master we are up", se);
3093      }
3094      rssStub = null;
3095    }
3096    return result;
3097  }
3098
3099  @Override
3100  public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
3101    try {
3102      GetLastFlushedSequenceIdRequest req =
3103        RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
3104      RegionServerStatusService.BlockingInterface rss = rssStub;
3105      if (rss == null) { // Try to connect one more time
3106        createRegionServerStatusStub();
3107        rss = rssStub;
3108        if (rss == null) {
3109          // Still no luck, we tried
3110          LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
3111          return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
3112            .build();
3113        }
3114      }
3115      GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
3116      return RegionStoreSequenceIds.newBuilder()
3117        .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
3118        .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
3119    } catch (ServiceException e) {
3120      LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
3121      return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
3122        .build();
3123    }
3124  }
3125
3126  /**
3127   * Close meta region if we carry it
3128   * @param abort Whether we're running an abort.
3129   */
3130  private void closeMetaTableRegions(final boolean abort) {
3131    HRegion meta = null;
3132    this.onlineRegionsLock.writeLock().lock();
3133    try {
3134      for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) {
3135        RegionInfo hri = e.getValue().getRegionInfo();
3136        if (hri.isMetaRegion()) {
3137          meta = e.getValue();
3138        }
3139        if (meta != null) {
3140          break;
3141        }
3142      }
3143    } finally {
3144      this.onlineRegionsLock.writeLock().unlock();
3145    }
3146    if (meta != null) {
3147      closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
3148    }
3149  }
3150
3151  /**
3152   * Schedule closes on all user regions. Should be safe calling multiple times because it wont'
3153   * close regions that are already closed or that are closing.
3154   * @param abort Whether we're running an abort.
3155   */
3156  private void closeUserRegions(final boolean abort) {
3157    this.onlineRegionsLock.writeLock().lock();
3158    try {
3159      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
3160        HRegion r = e.getValue();
3161        if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) {
3162          // Don't update zk with this close transition; pass false.
3163          closeRegionIgnoreErrors(r.getRegionInfo(), abort);
3164        }
3165      }
3166    } finally {
3167      this.onlineRegionsLock.writeLock().unlock();
3168    }
3169  }
3170
3171  /** Returns the info server */
3172  public InfoServer getInfoServer() {
3173    return infoServer;
3174  }
3175
3176  /** Returns true if a stop has been requested. */
3177  @Override
3178  public boolean isStopped() {
3179    return this.stopped;
3180  }
3181
3182  @Override
3183  public boolean isStopping() {
3184    return this.stopping;
3185  }
3186
3187  @Override
3188  public Configuration getConfiguration() {
3189    return conf;
3190  }
3191
3192  protected Map<String, HRegion> getOnlineRegions() {
3193    return this.onlineRegions;
3194  }
3195
3196  public int getNumberOfOnlineRegions() {
3197    return this.onlineRegions.size();
3198  }
3199
3200  /**
3201   * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM
3202   * as client; HRegion cannot be serialized to cross an rpc.
3203   */
3204  public Collection<HRegion> getOnlineRegionsLocalContext() {
3205    Collection<HRegion> regions = this.onlineRegions.values();
3206    return Collections.unmodifiableCollection(regions);
3207  }
3208
3209  @Override
3210  public void addRegion(HRegion region) {
3211    this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
3212    configurationManager.registerObserver(region);
3213  }
3214
3215  /**
3216   * @return A new Map of online regions sorted by region off-heap size with the first entry being
3217   *         the biggest. If two regions are the same size, then the last one found wins; i.e. this
3218   *         method may NOT return all regions.
3219   */
3220  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() {
3221    // we'll sort the regions in reverse
3222    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
3223    // Copy over all regions. Regions are sorted by size with biggest first.
3224    for (HRegion region : this.onlineRegions.values()) {
3225      sortedRegions.put(region.getMemStoreOffHeapSize(), region);
3226    }
3227    return sortedRegions;
3228  }
3229
3230  /**
3231   * @return A new Map of online regions sorted by region heap size with the first entry being the
3232   *         biggest. If two regions are the same size, then the last one found wins; i.e. this
3233   *         method may NOT return all regions.
3234   */
3235  SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() {
3236    // we'll sort the regions in reverse
3237    SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder());
3238    // Copy over all regions. Regions are sorted by size with biggest first.
3239    for (HRegion region : this.onlineRegions.values()) {
3240      sortedRegions.put(region.getMemStoreHeapSize(), region);
3241    }
3242    return sortedRegions;
3243  }
3244
3245  /** Returns time stamp in millis of when this region server was started */
3246  public long getStartcode() {
3247    return this.startcode;
3248  }
3249
3250  /** Returns reference to FlushRequester */
3251  @Override
3252  public FlushRequester getFlushRequester() {
3253    return this.cacheFlusher;
3254  }
3255
3256  @Override
3257  public CompactionRequester getCompactionRequestor() {
3258    return this.compactSplitThread;
3259  }
3260
3261  @Override
3262  public LeaseManager getLeaseManager() {
3263    return leaseManager;
3264  }
3265
3266  /** Returns Return the rootDir. */
3267  protected Path getDataRootDir() {
3268    return dataRootDir;
3269  }
3270
3271  @Override
3272  public FileSystem getFileSystem() {
3273    return dataFs;
3274  }
3275
3276  /** Returns {@code true} when the data file system is available, {@code false} otherwise. */
3277  boolean isDataFileSystemOk() {
3278    return this.dataFsOk;
3279  }
3280
3281  /** Returns Return the walRootDir. */
3282  public Path getWALRootDir() {
3283    return walRootDir;
3284  }
3285
3286  /** Returns Return the walFs. */
3287  public FileSystem getWALFileSystem() {
3288    return walFs;
3289  }
3290
3291  @Override
3292  public String toString() {
3293    return getServerName().toString();
3294  }
3295
3296  @Override
3297  public ZKWatcher getZooKeeper() {
3298    return zooKeeper;
3299  }
3300
3301  @Override
3302  public CoordinatedStateManager getCoordinatedStateManager() {
3303    return csm;
3304  }
3305
3306  @Override
3307  public ServerName getServerName() {
3308    return serverName;
3309  }
3310
3311  public RegionServerCoprocessorHost getRegionServerCoprocessorHost() {
3312    return this.rsHost;
3313  }
3314
3315  @Override
3316  public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
3317    return this.regionsInTransitionInRS;
3318  }
3319
3320  @Override
3321  public ExecutorService getExecutorService() {
3322    return executorService;
3323  }
3324
3325  @Override
3326  public ChoreService getChoreService() {
3327    return choreService;
3328  }
3329
3330  @Override
3331  public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
3332    return rsQuotaManager;
3333  }
3334
3335  //
3336  // Main program and support routines
3337  //
3338  /**
3339   * Load the replication executorService objects, if any
3340   */
3341  private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
3342    FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException {
3343    if (
3344      (server instanceof HMaster)
3345        && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))
3346    ) {
3347      return;
3348    }
3349    // read in the name of the source replication class from the config file.
3350    String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
3351      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
3352
3353    // read in the name of the sink replication class from the config file.
3354    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
3355      HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
3356
3357    // If both the sink and the source class names are the same, then instantiate
3358    // only one object.
3359    if (sourceClassname.equals(sinkClassname)) {
3360      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3361        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
3362      server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
3363    } else {
3364      server.replicationSourceHandler = newReplicationInstance(sourceClassname,
3365        ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
3366      server.replicationSinkHandler = newReplicationInstance(sinkClassname,
3367        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory);
3368    }
3369  }
3370
3371  private static <T extends ReplicationService> T newReplicationInstance(String classname,
3372    Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
3373    Path oldLogDir, WALFactory walFactory) throws IOException {
3374    final Class<? extends T> clazz;
3375    try {
3376      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
3377      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
3378    } catch (java.lang.ClassNotFoundException nfe) {
3379      throw new IOException("Could not find class for " + classname);
3380    }
3381    T service = ReflectionUtils.newInstance(clazz, conf);
3382    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
3383    return service;
3384  }
3385
3386  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
3387    Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>();
3388    if (!this.isOnline()) {
3389      return walGroupsReplicationStatus;
3390    }
3391    List<ReplicationSourceInterface> allSources = new ArrayList<>();
3392    allSources.addAll(replicationSourceHandler.getReplicationManager().getSources());
3393    allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources());
3394    for (ReplicationSourceInterface source : allSources) {
3395      walGroupsReplicationStatus.putAll(source.getWalGroupStatus());
3396    }
3397    return walGroupsReplicationStatus;
3398  }
3399
3400  /**
3401   * Utility for constructing an instance of the passed HRegionServer class.
3402   */
3403  static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass,
3404    final Configuration conf) {
3405    try {
3406      Constructor<? extends HRegionServer> c =
3407        regionServerClass.getConstructor(Configuration.class);
3408      return c.newInstance(conf);
3409    } catch (Exception e) {
3410      throw new RuntimeException(
3411        "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e);
3412    }
3413  }
3414
3415  /**
3416   * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
3417   */
3418  public static void main(String[] args) {
3419    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
3420    VersionInfo.logVersion();
3421    Configuration conf = HBaseConfiguration.create();
3422    @SuppressWarnings("unchecked")
3423    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
3424      .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
3425
3426    new HRegionServerCommandLine(regionServerClass).doMain(args);
3427  }
3428
3429  /**
3430   * Gets the online regions of the specified table. This method looks at the in-memory
3431   * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions.
3432   * If a region on this table has been closed during a disable, etc., it will not be included in
3433   * the returned list. So, the returned list may not necessarily be ALL regions in this table, its
3434   * all the ONLINE regions in the table.
3435   * @param tableName table to limit the scope of the query
3436   * @return Online regions from <code>tableName</code>
3437   */
3438  @Override
3439  public List<HRegion> getRegions(TableName tableName) {
3440    List<HRegion> tableRegions = new ArrayList<>();
3441    synchronized (this.onlineRegions) {
3442      for (HRegion region : this.onlineRegions.values()) {
3443        RegionInfo regionInfo = region.getRegionInfo();
3444        if (regionInfo.getTable().equals(tableName)) {
3445          tableRegions.add(region);
3446        }
3447      }
3448    }
3449    return tableRegions;
3450  }
3451
3452  @Override
3453  public List<HRegion> getRegions() {
3454    List<HRegion> allRegions;
3455    synchronized (this.onlineRegions) {
3456      // Return a clone copy of the onlineRegions
3457      allRegions = new ArrayList<>(onlineRegions.values());
3458    }
3459    return allRegions;
3460  }
3461
3462  /**
3463   * Gets the online tables in this RS. This method looks at the in-memory onlineRegions.
3464   * @return all the online tables in this RS
3465   */
3466  public Set<TableName> getOnlineTables() {
3467    Set<TableName> tables = new HashSet<>();
3468    synchronized (this.onlineRegions) {
3469      for (Region region : this.onlineRegions.values()) {
3470        tables.add(region.getTableDescriptor().getTableName());
3471      }
3472    }
3473    return tables;
3474  }
3475
3476  public String[] getRegionServerCoprocessors() {
3477    TreeSet<String> coprocessors = new TreeSet<>();
3478    try {
3479      coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
3480    } catch (IOException exception) {
3481      LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; "
3482        + "skipping.");
3483      LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3484    }
3485    Collection<HRegion> regions = getOnlineRegionsLocalContext();
3486    for (HRegion region : regions) {
3487      coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
3488      try {
3489        coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
3490      } catch (IOException exception) {
3491        LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region
3492          + "; skipping.");
3493        LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
3494      }
3495    }
3496    coprocessors.addAll(rsHost.getCoprocessors());
3497    return coprocessors.toArray(new String[0]);
3498  }
3499
3500  /**
3501   * Try to close the region, logs a warning on failure but continues.
3502   * @param region Region to close
3503   */
3504  private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) {
3505    try {
3506      if (!closeRegion(region.getEncodedName(), abort, null)) {
3507        LOG
3508          .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing");
3509      }
3510    } catch (IOException e) {
3511      LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing",
3512        e);
3513    }
3514  }
3515
3516  /**
3517   * Close asynchronously a region, can be called from the master or internally by the regionserver
3518   * when stopping. If called from the master, the region will update the status.
3519   * <p>
3520   * If an opening was in progress, this method will cancel it, but will not start a new close. The
3521   * coprocessors are not called in this case. A NotServingRegionException exception is thrown.
3522   * </p>
3523   * <p>
3524   * If a close was in progress, this new request will be ignored, and an exception thrown.
3525   * </p>
3526   * <p>
3527   * Provides additional flag to indicate if this region blocks should be evicted from the cache.
3528   * </p>
3529   * @param encodedName Region to close
3530   * @param abort       True if we are aborting
3531   * @param destination Where the Region is being moved too... maybe null if unknown.
3532   * @return True if closed a region.
3533   * @throws NotServingRegionException if the region is not online
3534   */
3535  protected boolean closeRegion(String encodedName, final boolean abort,
3536    final ServerName destination) throws NotServingRegionException {
3537    // Check for permissions to close.
3538    HRegion actualRegion = this.getRegion(encodedName);
3539    // Can be null if we're calling close on a region that's not online
3540    if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
3541      try {
3542        actualRegion.getCoprocessorHost().preClose(false);
3543      } catch (IOException exp) {
3544        LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
3545        return false;
3546      }
3547    }
3548
3549    // previous can come back 'null' if not in map.
3550    final Boolean previous =
3551      this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE);
3552
3553    if (Boolean.TRUE.equals(previous)) {
3554      LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already "
3555        + "trying to OPEN. Cancelling OPENING.");
3556      if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
3557        // The replace failed. That should be an exceptional case, but theoretically it can happen.
3558        // We're going to try to do a standard close then.
3559        LOG.warn("The opening for region " + encodedName + " was done before we could cancel it."
3560          + " Doing a standard close now");
3561        return closeRegion(encodedName, abort, destination);
3562      }
3563      // Let's get the region from the online region list again
3564      actualRegion = this.getRegion(encodedName);
3565      if (actualRegion == null) { // If already online, we still need to close it.
3566        LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3567        // The master deletes the znode when it receives this exception.
3568        throw new NotServingRegionException(
3569          "The region " + encodedName + " was opening but not yet served. Opening is cancelled.");
3570      }
3571    } else if (previous == null) {
3572      LOG.info("Received CLOSE for {}", encodedName);
3573    } else if (Boolean.FALSE.equals(previous)) {
3574      LOG.info("Received CLOSE for the region: " + encodedName
3575        + ", which we are already trying to CLOSE, but not completed yet");
3576      return true;
3577    }
3578
3579    if (actualRegion == null) {
3580      LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
3581      this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
3582      // The master deletes the znode when it receives this exception.
3583      throw new NotServingRegionException(
3584        "The region " + encodedName + " is not online, and is not opening.");
3585    }
3586
3587    CloseRegionHandler crh;
3588    final RegionInfo hri = actualRegion.getRegionInfo();
3589    if (hri.isMetaRegion()) {
3590      crh = new CloseMetaHandler(this, this, hri, abort);
3591    } else {
3592      crh = new CloseRegionHandler(this, this, hri, abort, destination);
3593    }
3594    this.executorService.submit(crh);
3595    return true;
3596  }
3597
3598  /**
3599   * @return HRegion for the passed binary <code>regionName</code> or null if named region is not
3600   *         member of the online regions.
3601   */
3602  public HRegion getOnlineRegion(final byte[] regionName) {
3603    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3604    return this.onlineRegions.get(encodedRegionName);
3605  }
3606
3607  @Override
3608  public HRegion getRegion(final String encodedRegionName) {
3609    return this.onlineRegions.get(encodedRegionName);
3610  }
3611
3612  @Override
3613  public boolean removeRegion(final HRegion r, ServerName destination) {
3614    HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3615    metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
3616    if (destination != null) {
3617      long closeSeqNum = r.getMaxFlushedSeqId();
3618      if (closeSeqNum == HConstants.NO_SEQNUM) {
3619        // No edits in WAL for this region; get the sequence number when the region was opened.
3620        closeSeqNum = r.getOpenSeqNum();
3621        if (closeSeqNum == HConstants.NO_SEQNUM) {
3622          closeSeqNum = 0;
3623        }
3624      }
3625      boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
3626      addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
3627      if (selfMove) {
3628        this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
3629          r.getRegionInfo().getEncodedName(),
3630          new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
3631      }
3632    }
3633    this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3634    return toReturn != null;
3635  }
3636
3637  /**
3638   * Protected Utility method for safely obtaining an HRegion handle.
3639   * @param regionName Name of online {@link HRegion} to return
3640   * @return {@link HRegion} for <code>regionName</code>
3641   */
3642  protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException {
3643    String encodedRegionName = RegionInfo.encodeRegionName(regionName);
3644    return getRegionByEncodedName(regionName, encodedRegionName);
3645  }
3646
3647  public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException {
3648    return getRegionByEncodedName(null, encodedRegionName);
3649  }
3650
3651  private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3652    throws NotServingRegionException {
3653    HRegion region = this.onlineRegions.get(encodedRegionName);
3654    if (region == null) {
3655      MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3656      if (moveInfo != null) {
3657        throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3658      }
3659      Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3660      String regionNameStr =
3661        regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName);
3662      if (isOpening != null && isOpening) {
3663        throw new RegionOpeningException(
3664          "Region " + regionNameStr + " is opening on " + this.serverName);
3665      }
3666      throw new NotServingRegionException(
3667        "" + regionNameStr + " is not online on " + this.serverName);
3668    }
3669    return region;
3670  }
3671
3672  /**
3673   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't
3674   * already.
3675   * @param t   Throwable
3676   * @param msg Message to log in error. Can be null.
3677   * @return Throwable converted to an IOE; methods can only let out IOEs.
3678   */
3679  private Throwable cleanup(final Throwable t, final String msg) {
3680    // Don't log as error if NSRE; NSRE is 'normal' operation.
3681    if (t instanceof NotServingRegionException) {
3682      LOG.debug("NotServingRegionException; " + t.getMessage());
3683      return t;
3684    }
3685    Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t;
3686    if (msg == null) {
3687      LOG.error("", e);
3688    } else {
3689      LOG.error(msg, e);
3690    }
3691    if (!rpcServices.checkOOME(t)) {
3692      checkFileSystem();
3693    }
3694    return t;
3695  }
3696
3697  /**
3698   * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
3699   * @return Make <code>t</code> an IOE if it isn't already.
3700   */
3701  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
3702    return (t instanceof IOException ? (IOException) t
3703      : msg == null || msg.length() == 0 ? new IOException(t)
3704      : new IOException(msg, t));
3705  }
3706
3707  /**
3708   * Checks to see if the file system is still accessible. If not, sets abortRequested and
3709   * stopRequested
3710   * @return false if file system is not available
3711   */
3712  boolean checkFileSystem() {
3713    if (this.dataFsOk && this.dataFs != null) {
3714      try {
3715        FSUtils.checkFileSystemAvailable(this.dataFs);
3716      } catch (IOException e) {
3717        abort("File System not available", e);
3718        this.dataFsOk = false;
3719      }
3720    }
3721    return this.dataFsOk;
3722  }
3723
3724  @Override
3725  public void updateRegionFavoredNodesMapping(String encodedRegionName,
3726    List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3727    Address[] addr = new Address[favoredNodes.size()];
3728    // Refer to the comment on the declaration of regionFavoredNodesMap on why
3729    // it is a map of region name to Address[]
3730    for (int i = 0; i < favoredNodes.size(); i++) {
3731      addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort());
3732    }
3733    regionFavoredNodesMap.put(encodedRegionName, addr);
3734  }
3735
3736  /**
3737   * Return the favored nodes for a region given its encoded name. Look at the comment around
3738   * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here.
3739   * @param encodedRegionName the encoded region name.
3740   * @return array of favored locations
3741   */
3742  @Override
3743  public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3744    return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName));
3745  }
3746
3747  @Override
3748  public ServerNonceManager getNonceManager() {
3749    return this.nonceManager;
3750  }
3751
3752  private static class MovedRegionInfo {
3753    private final ServerName serverName;
3754    private final long seqNum;
3755
3756    MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3757      this.serverName = serverName;
3758      this.seqNum = closeSeqNum;
3759    }
3760
3761    public ServerName getServerName() {
3762      return serverName;
3763    }
3764
3765    public long getSeqNum() {
3766      return seqNum;
3767    }
3768  }
3769
3770  /**
3771   * We need a timeout. If not there is a risk of giving a wrong information: this would double the
3772   * number of network calls instead of reducing them.
3773   */
3774  private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3775
3776  private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum,
3777    boolean selfMove) {
3778    if (selfMove) {
3779      LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3780      return;
3781    }
3782    LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid="
3783      + closeSeqNum);
3784    movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3785  }
3786
3787  void removeFromMovedRegions(String encodedName) {
3788    movedRegionInfoCache.invalidate(encodedName);
3789  }
3790
3791  @InterfaceAudience.Private
3792  public MovedRegionInfo getMovedRegion(String encodedRegionName) {
3793    return movedRegionInfoCache.getIfPresent(encodedRegionName);
3794  }
3795
3796  @InterfaceAudience.Private
3797  public int movedRegionCacheExpiredTime() {
3798    return TIMEOUT_REGION_MOVED;
3799  }
3800
3801  private String getMyEphemeralNodePath() {
3802    return zooKeeper.getZNodePaths().getRsPath(serverName);
3803  }
3804
3805  private boolean isHealthCheckerConfigured() {
3806    String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3807    return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation);
3808  }
3809
3810  /** Returns the underlying {@link CompactSplit} for the servers */
3811  public CompactSplit getCompactSplitThread() {
3812    return this.compactSplitThread;
3813  }
3814
3815  CoprocessorServiceResponse execRegionServerService(
3816    @SuppressWarnings("UnusedParameters") final RpcController controller,
3817    final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3818    try {
3819      ServerRpcController serviceController = new ServerRpcController();
3820      CoprocessorServiceCall call = serviceRequest.getCall();
3821      String serviceName = call.getServiceName();
3822      com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName);
3823      if (service == null) {
3824        throw new UnknownProtocolException(null,
3825          "No registered coprocessor executorService found for " + serviceName);
3826      }
3827      com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc =
3828        service.getDescriptorForType();
3829
3830      String methodName = call.getMethodName();
3831      com.google.protobuf.Descriptors.MethodDescriptor methodDesc =
3832        serviceDesc.findMethodByName(methodName);
3833      if (methodDesc == null) {
3834        throw new UnknownProtocolException(service.getClass(),
3835          "Unknown method " + methodName + " called on executorService " + serviceName);
3836      }
3837
3838      com.google.protobuf.Message request =
3839        CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest());
3840      final com.google.protobuf.Message.Builder responseBuilder =
3841        service.getResponsePrototype(methodDesc).newBuilderForType();
3842      service.callMethod(methodDesc, serviceController, request, message -> {
3843        if (message != null) {
3844          responseBuilder.mergeFrom(message);
3845        }
3846      });
3847      IOException exception = CoprocessorRpcUtils.getControllerException(serviceController);
3848      if (exception != null) {
3849        throw exception;
3850      }
3851      return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY);
3852    } catch (IOException ie) {
3853      throw new ServiceException(ie);
3854    }
3855  }
3856
3857  /**
3858   * May be null if this is a master which not carry table.
3859   * @return The block cache instance used by the regionserver.
3860   */
3861  @Override
3862  public Optional<BlockCache> getBlockCache() {
3863    return Optional.ofNullable(this.blockCache);
3864  }
3865
3866  /**
3867   * May be null if this is a master which not carry table.
3868   * @return The cache for mob files used by the regionserver.
3869   */
3870  @Override
3871  public Optional<MobFileCache> getMobFileCache() {
3872    return Optional.ofNullable(this.mobFileCache);
3873  }
3874
3875  @Override
3876  public AccessChecker getAccessChecker() {
3877    return rpcServices.getAccessChecker();
3878  }
3879
3880  @Override
3881  public ZKPermissionWatcher getZKPermissionWatcher() {
3882    return rpcServices.getZkPermissionWatcher();
3883  }
3884
3885  /** Returns : Returns the ConfigurationManager object for testing purposes. */
3886  @RestrictedApi(explanation = "Should only be called in tests", link = "",
3887      allowedOnPath = ".*/src/test/.*")
3888  public ConfigurationManager getConfigurationManager() {
3889    return configurationManager;
3890  }
3891
3892  /** Returns Return table descriptors implementation. */
3893  @Override
3894  public TableDescriptors getTableDescriptors() {
3895    return this.tableDescriptors;
3896  }
3897
3898  /**
3899   * Reload the configuration from disk.
3900   */
3901  void updateConfiguration() throws IOException {
3902    LOG.info("Reloading the configuration from disk.");
3903    // Reload the configuration from disk.
3904    preUpdateConfiguration();
3905    conf.reloadConfiguration();
3906    configurationManager.notifyAllObservers(conf);
3907    postUpdateConfiguration();
3908  }
3909
3910  protected void preUpdateConfiguration() throws IOException {
3911    if (rsHost != null) {
3912      rsHost.preUpdateConfiguration(conf);
3913    }
3914  }
3915
3916  protected void postUpdateConfiguration() throws IOException {
3917    if (rsHost != null) {
3918      rsHost.postUpdateConfiguration(conf);
3919    }
3920  }
3921
3922  CacheEvictionStats clearRegionBlockCache(Region region) {
3923    long evictedBlocks = 0;
3924
3925    for (Store store : region.getStores()) {
3926      for (StoreFile hFile : store.getStorefiles()) {
3927        evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName());
3928      }
3929    }
3930
3931    return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build();
3932  }
3933
3934  @Override
3935  public double getCompactionPressure() {
3936    double max = 0;
3937    for (Region region : onlineRegions.values()) {
3938      for (Store store : region.getStores()) {
3939        double normCount = store.getCompactionPressure();
3940        if (normCount > max) {
3941          max = normCount;
3942        }
3943      }
3944    }
3945    return max;
3946  }
3947
3948  @Override
3949  public HeapMemoryManager getHeapMemoryManager() {
3950    return hMemManager;
3951  }
3952
3953  public MemStoreFlusher getMemStoreFlusher() {
3954    return cacheFlusher;
3955  }
3956
3957  /**
3958   * For testing
3959   * @return whether all wal roll request finished for this regionserver
3960   */
3961  @InterfaceAudience.Private
3962  public boolean walRollRequestFinished() {
3963    return this.walRoller.walRollFinished();
3964  }
3965
3966  @Override
3967  public ThroughputController getFlushThroughputController() {
3968    return flushThroughputController;
3969  }
3970
3971  @Override
3972  public double getFlushPressure() {
3973    if (getRegionServerAccounting() == null || cacheFlusher == null) {
3974      // return 0 during RS initialization
3975      return 0.0;
3976    }
3977    return getRegionServerAccounting().getFlushPressure();
3978  }
3979
3980  @Override
3981  public void onConfigurationChange(Configuration newConf) {
3982    ThroughputController old = this.flushThroughputController;
3983    if (old != null) {
3984      old.stop("configuration change");
3985    }
3986    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
3987    try {
3988      Superusers.initialize(newConf);
3989    } catch (IOException e) {
3990      LOG.warn("Failed to initialize SuperUsers on reloading of the configuration");
3991    }
3992
3993    // update region server coprocessor if the configuration has changed.
3994    if (
3995      CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf,
3996        CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY)
3997    ) {
3998      LOG.info("Update region server coprocessors because the configuration has changed");
3999      this.rsHost = new RegionServerCoprocessorHost(this, newConf);
4000    }
4001  }
4002
4003  @Override
4004  public MetricsRegionServer getMetrics() {
4005    return metricsRegionServer;
4006  }
4007
4008  @Override
4009  public SecureBulkLoadManager getSecureBulkLoadManager() {
4010    return this.secureBulkLoadManager;
4011  }
4012
4013  @Override
4014  public EntityLock regionLock(final List<RegionInfo> regionInfos, final String description,
4015    final Abortable abort) {
4016    return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
4017      .regionLock(regionInfos, description, abort);
4018  }
4019
4020  @Override
4021  public void unassign(byte[] regionName) throws IOException {
4022    clusterConnection.getAdmin().unassign(regionName, false);
4023  }
4024
4025  @Override
4026  public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
4027    return this.rsSpaceQuotaManager;
4028  }
4029
4030  @Override
4031  public boolean reportFileArchivalForQuotas(TableName tableName,
4032    Collection<Entry<String, Long>> archivedFiles) {
4033    if (TEST_SKIP_REPORTING_TRANSITION) {
4034      return false;
4035    }
4036    RegionServerStatusService.BlockingInterface rss = rssStub;
4037    if (rss == null || rsSpaceQuotaManager == null) {
4038      // the current server could be stopping.
4039      LOG.trace("Skipping file archival reporting to HMaster as stub is null");
4040      return false;
4041    }
4042    try {
4043      RegionServerStatusProtos.FileArchiveNotificationRequest request =
4044        rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles);
4045      rss.reportFileArchival(null, request);
4046    } catch (ServiceException se) {
4047      IOException ioe = ProtobufUtil.getRemoteException(se);
4048      if (ioe instanceof PleaseHoldException) {
4049        if (LOG.isTraceEnabled()) {
4050          LOG.trace("Failed to report file archival(s) to Master because it is initializing."
4051            + " This will be retried.", ioe);
4052        }
4053        // The Master is coming up. Will retry the report later. Avoid re-creating the stub.
4054        return false;
4055      }
4056      if (rssStub == rss) {
4057        rssStub = null;
4058      }
4059      // re-create the stub if we failed to report the archival
4060      createRegionServerStatusStub(true);
4061      LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe);
4062      return false;
4063    }
4064    return true;
4065  }
4066
4067  public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
4068    return eventLoopGroupConfig;
4069  }
4070
4071  @Override
4072  public Connection createConnection(Configuration conf) throws IOException {
4073    User user = UserProvider.instantiate(conf).getCurrent();
4074    return ServerConnectionUtils.createShortCircuitConnection(conf, user, this.serverName,
4075      this.rpcServices, this.rpcServices, new RegionServerRegistry(this));
4076  }
4077
4078  void executeProcedure(long procId, RSProcedureCallable callable) {
4079    executorService.submit(new RSProcedureHandler(this, procId, callable));
4080  }
4081
4082  public void remoteProcedureComplete(long procId, Throwable error) {
4083    procedureResultReporter.complete(procId, error);
4084  }
4085
4086  void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException {
4087    RegionServerStatusService.BlockingInterface rss;
4088    // TODO: juggling class state with an instance variable, outside of a synchronized block :'(
4089    for (;;) {
4090      rss = rssStub;
4091      if (rss != null) {
4092        break;
4093      }
4094      createRegionServerStatusStub();
4095    }
4096    try {
4097      rss.reportProcedureDone(null, request);
4098    } catch (ServiceException se) {
4099      if (rssStub == rss) {
4100        rssStub = null;
4101      }
4102      throw ProtobufUtil.getRemoteException(se);
4103    }
4104  }
4105
4106  /**
4107   * Will ignore the open/close region procedures which already submitted or executed. When master
4108   * had unfinished open/close region procedure and restarted, new active master may send duplicate
4109   * open/close region request to regionserver. The open/close request is submitted to a thread pool
4110   * and execute. So first need a cache for submitted open/close region procedures. After the
4111   * open/close region request executed and report region transition succeed, cache it in executed
4112   * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region
4113   * transition succeed, master will not send the open/close region request to regionserver again.
4114   * And we thought that the ongoing duplicate open/close region request should not be delayed more
4115   * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See
4116   * HBASE-22404 for more details.
4117   * @param procId the id of the open/close region procedure
4118   * @return true if the procedure can be submitted.
4119   */
4120  boolean submitRegionProcedure(long procId) {
4121    if (procId == -1) {
4122      return true;
4123    }
4124    // Ignore the region procedures which already submitted.
4125    Long previous = submittedRegionProcedures.putIfAbsent(procId, procId);
4126    if (previous != null) {
4127      LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId);
4128      return false;
4129    }
4130    // Ignore the region procedures which already executed.
4131    if (executedRegionProcedures.getIfPresent(procId) != null) {
4132      LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId);
4133      return false;
4134    }
4135    return true;
4136  }
4137
4138  /**
4139   * See {@link #submitRegionProcedure(long)}.
4140   * @param procId the id of the open/close region procedure
4141   */
4142  public void finishRegionProcedure(long procId) {
4143    executedRegionProcedures.put(procId, procId);
4144    submittedRegionProcedures.remove(procId);
4145  }
4146
4147  public boolean isShutDown() {
4148    return shutDown;
4149  }
4150
4151  /**
4152   * Force to terminate region server when abort timeout.
4153   */
4154  private static class SystemExitWhenAbortTimeout extends TimerTask {
4155
4156    public SystemExitWhenAbortTimeout() {
4157    }
4158
4159    @Override
4160    public void run() {
4161      LOG.warn("Aborting region server timed out, terminating forcibly"
4162        + " and does not wait for any running shutdown hooks or finalizers to finish their work."
4163        + " Thread dump to stdout.");
4164      Threads.printThreadInfo(System.out, "Zombie HRegionServer");
4165      Runtime.getRuntime().halt(1);
4166    }
4167  }
4168
4169  @InterfaceAudience.Private
4170  public CompactedHFilesDischarger getCompactedHFilesDischarger() {
4171    return compactedFileDischarger;
4172  }
4173
4174  /**
4175   * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}}
4176   * @return pause time
4177   */
4178  @InterfaceAudience.Private
4179  public long getRetryPauseTime() {
4180    return this.retryPauseTime;
4181  }
4182
4183  public Optional<ServerName> getActiveMaster() {
4184    return Optional.ofNullable(masterAddressTracker.getMasterAddress());
4185  }
4186
4187  public List<ServerName> getBackupMasters() {
4188    return masterAddressTracker.getBackupMasters();
4189  }
4190
4191  public Iterator<ServerName> getBootstrapNodes() {
4192    return bootstrapNodeManager.getBootstrapNodes().iterator();
4193  }
4194
4195  public MetaRegionLocationCache getMetaRegionLocationCache() {
4196    return this.metaRegionLocationCache;
4197  }
4198
4199  @InterfaceAudience.Private
4200  public BrokenStoreFileCleaner getBrokenStoreFileCleaner() {
4201    return brokenStoreFileCleaner;
4202  }
4203
4204  @InterfaceAudience.Private
4205  public RSMobFileCleanerChore getRSMobFileCleanerChore() {
4206    return rsMobFileCleanerChore;
4207  }
4208
4209  RSSnapshotVerifier getRsSnapshotVerifier() {
4210    return rsSnapshotVerifier;
4211  }
4212}