001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.ChoreService.CHORE_SERVICE_INITIAL_POOL_SIZE; 021import static org.apache.hadoop.hbase.ChoreService.DEFAULT_CHORE_SERVICE_INITIAL_POOL_SIZE; 022import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; 023import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; 024import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; 025import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; 026import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 027import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; 028import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; 029import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; 030import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; 031import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; 032import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; 033import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; 034import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; 035import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; 036 037import com.google.errorprone.annotations.RestrictedApi; 038import io.opentelemetry.api.trace.Span; 039import io.opentelemetry.api.trace.StatusCode; 040import io.opentelemetry.context.Scope; 041import java.io.IOException; 042import java.io.PrintWriter; 043import java.lang.management.MemoryType; 044import java.lang.management.MemoryUsage; 045import java.lang.reflect.Constructor; 046import java.net.BindException; 047import java.net.InetAddress; 048import java.net.InetSocketAddress; 049import java.time.Duration; 050import java.util.ArrayList; 051import java.util.Collection; 052import java.util.Collections; 053import java.util.Comparator; 054import java.util.HashSet; 055import java.util.Iterator; 056import java.util.List; 057import java.util.Map; 058import java.util.Map.Entry; 059import java.util.Objects; 060import java.util.Optional; 061import java.util.Set; 062import java.util.SortedMap; 063import java.util.Timer; 064import java.util.TimerTask; 065import java.util.TreeMap; 066import java.util.TreeSet; 067import java.util.concurrent.ConcurrentHashMap; 068import java.util.concurrent.ConcurrentMap; 069import java.util.concurrent.ConcurrentSkipListMap; 070import java.util.concurrent.ThreadLocalRandom; 071import java.util.concurrent.TimeUnit; 072import java.util.concurrent.atomic.AtomicBoolean; 073import java.util.concurrent.locks.ReentrantReadWriteLock; 074import java.util.function.Consumer; 075import java.util.stream.Collectors; 076import javax.management.MalformedObjectNameException; 077import javax.servlet.http.HttpServlet; 078import org.apache.commons.lang3.StringUtils; 079import org.apache.commons.lang3.SystemUtils; 080import org.apache.commons.lang3.mutable.MutableFloat; 081import org.apache.hadoop.conf.Configuration; 082import org.apache.hadoop.fs.FileSystem; 083import org.apache.hadoop.fs.Path; 084import org.apache.hadoop.hbase.Abortable; 085import org.apache.hadoop.hbase.CacheEvictionStats; 086import org.apache.hadoop.hbase.CallQueueTooBigException; 087import org.apache.hadoop.hbase.ChoreService; 088import org.apache.hadoop.hbase.ClockOutOfSyncException; 089import org.apache.hadoop.hbase.CoordinatedStateManager; 090import org.apache.hadoop.hbase.DoNotRetryIOException; 091import org.apache.hadoop.hbase.FailedCloseWALAfterInitializedErrorException; 092import org.apache.hadoop.hbase.HBaseConfiguration; 093import org.apache.hadoop.hbase.HBaseInterfaceAudience; 094import org.apache.hadoop.hbase.HConstants; 095import org.apache.hadoop.hbase.HDFSBlocksDistribution; 096import org.apache.hadoop.hbase.HealthCheckChore; 097import org.apache.hadoop.hbase.MetaRegionLocationCache; 098import org.apache.hadoop.hbase.MetaTableAccessor; 099import org.apache.hadoop.hbase.NotServingRegionException; 100import org.apache.hadoop.hbase.PleaseHoldException; 101import org.apache.hadoop.hbase.ScheduledChore; 102import org.apache.hadoop.hbase.Server; 103import org.apache.hadoop.hbase.ServerName; 104import org.apache.hadoop.hbase.Stoppable; 105import org.apache.hadoop.hbase.TableDescriptors; 106import org.apache.hadoop.hbase.TableName; 107import org.apache.hadoop.hbase.YouAreDeadException; 108import org.apache.hadoop.hbase.ZNodeClearer; 109import org.apache.hadoop.hbase.client.ClusterConnection; 110import org.apache.hadoop.hbase.client.Connection; 111import org.apache.hadoop.hbase.client.ConnectionUtils; 112import org.apache.hadoop.hbase.client.RegionInfo; 113import org.apache.hadoop.hbase.client.RegionInfoBuilder; 114import org.apache.hadoop.hbase.client.RegionServerRegistry; 115import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 116import org.apache.hadoop.hbase.client.ServerConnectionUtils; 117import org.apache.hadoop.hbase.client.locking.EntityLock; 118import org.apache.hadoop.hbase.client.locking.LockServiceClient; 119import org.apache.hadoop.hbase.conf.ConfigurationManager; 120import org.apache.hadoop.hbase.conf.ConfigurationObserver; 121import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; 122import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 123import org.apache.hadoop.hbase.exceptions.RegionMovedException; 124import org.apache.hadoop.hbase.exceptions.RegionOpeningException; 125import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 126import org.apache.hadoop.hbase.executor.ExecutorService; 127import org.apache.hadoop.hbase.executor.ExecutorType; 128import org.apache.hadoop.hbase.fs.HFileSystem; 129import org.apache.hadoop.hbase.http.InfoServer; 130import org.apache.hadoop.hbase.io.hfile.BlockCache; 131import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 132import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; 133import org.apache.hadoop.hbase.io.hfile.HFile; 134import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 135import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 136import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 137import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 138import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; 139import org.apache.hadoop.hbase.ipc.RpcClient; 140import org.apache.hadoop.hbase.ipc.RpcClientFactory; 141import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 142import org.apache.hadoop.hbase.ipc.RpcServer; 143import org.apache.hadoop.hbase.ipc.RpcServerInterface; 144import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 145import org.apache.hadoop.hbase.ipc.ServerRpcController; 146import org.apache.hadoop.hbase.log.HBaseMarkers; 147import org.apache.hadoop.hbase.master.HMaster; 148import org.apache.hadoop.hbase.master.LoadBalancer; 149import org.apache.hadoop.hbase.mob.MobFileCache; 150import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; 151import org.apache.hadoop.hbase.monitoring.TaskMonitor; 152import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 153import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; 154import org.apache.hadoop.hbase.net.Address; 155import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; 156import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; 157import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; 158import org.apache.hadoop.hbase.quotas.QuotaUtil; 159import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; 160import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; 161import org.apache.hadoop.hbase.quotas.RegionSize; 162import org.apache.hadoop.hbase.quotas.RegionSizeStore; 163import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 164import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 165import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; 166import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; 167import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; 168import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; 169import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; 170import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; 171import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet; 172import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; 173import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; 174import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; 175import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 176import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; 177import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; 178import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; 179import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; 180import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; 181import org.apache.hadoop.hbase.security.SecurityConstants; 182import org.apache.hadoop.hbase.security.Superusers; 183import org.apache.hadoop.hbase.security.User; 184import org.apache.hadoop.hbase.security.UserProvider; 185import org.apache.hadoop.hbase.security.access.AccessChecker; 186import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; 187import org.apache.hadoop.hbase.trace.TraceUtil; 188import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent; 189import org.apache.hadoop.hbase.util.Addressing; 190import org.apache.hadoop.hbase.util.Bytes; 191import org.apache.hadoop.hbase.util.CommonFSUtils; 192import org.apache.hadoop.hbase.util.CompressionTest; 193import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; 194import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 195import org.apache.hadoop.hbase.util.FSTableDescriptors; 196import org.apache.hadoop.hbase.util.FSUtils; 197import org.apache.hadoop.hbase.util.JvmPauseMonitor; 198import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; 199import org.apache.hadoop.hbase.util.Pair; 200import org.apache.hadoop.hbase.util.RetryCounter; 201import org.apache.hadoop.hbase.util.RetryCounterFactory; 202import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 203import org.apache.hadoop.hbase.util.Sleeper; 204import org.apache.hadoop.hbase.util.Threads; 205import org.apache.hadoop.hbase.util.VersionInfo; 206import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 207import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; 208import org.apache.hadoop.hbase.wal.WAL; 209import org.apache.hadoop.hbase.wal.WALFactory; 210import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 211import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 212import org.apache.hadoop.hbase.zookeeper.ZKAuthentication; 213import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 214import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 215import org.apache.hadoop.hbase.zookeeper.ZKUtil; 216import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 217import org.apache.hadoop.ipc.RemoteException; 218import org.apache.hadoop.util.ReflectionUtils; 219import org.apache.yetus.audience.InterfaceAudience; 220import org.apache.zookeeper.KeeperException; 221import org.slf4j.Logger; 222import org.slf4j.LoggerFactory; 223 224import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 225import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 226import org.apache.hbase.thirdparty.com.google.common.cache.Cache; 227import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 228import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 229import org.apache.hbase.thirdparty.com.google.common.net.InetAddresses; 230import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 231import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 232import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 233import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 234import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 235 236import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 237import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 238import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 239import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 240import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 241import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 242import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; 243import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; 244import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad; 245import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; 246import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 247import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; 248import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 249import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 250import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; 251import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 252import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; 253import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; 254import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; 255import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; 256import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 257import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 258import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; 259import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; 260import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 261import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 262import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 263import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; 264import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 265import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 266 267/** 268 * HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There 269 * are many HRegionServers in a single HBase deployment. 270 */ 271@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 272@SuppressWarnings({ "deprecation" }) 273public class HRegionServer extends Thread 274 implements RegionServerServices, LastSequenceId, ConfigurationObserver { 275 private static final Logger LOG = LoggerFactory.getLogger(HRegionServer.class); 276 277 int unitMB = 1024 * 1024; 278 int unitKB = 1024; 279 280 /** 281 * For testing only! Set to true to skip notifying region assignment to master . 282 */ 283 @InterfaceAudience.Private 284 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "MS_SHOULD_BE_FINAL") 285 public static boolean TEST_SKIP_REPORTING_TRANSITION = false; 286 287 /** 288 * A map from RegionName to current action in progress. Boolean value indicates: true - if open 289 * region action in progress false - if close region action in progress 290 */ 291 private final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS = 292 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); 293 294 /** 295 * Used to cache the open/close region procedures which already submitted. See 296 * {@link #submitRegionProcedure(long)}. 297 */ 298 private final ConcurrentMap<Long, Long> submittedRegionProcedures = new ConcurrentHashMap<>(); 299 /** 300 * Used to cache the open/close region procedures which already executed. See 301 * {@link #submitRegionProcedure(long)}. 302 */ 303 private final Cache<Long, Long> executedRegionProcedures = 304 CacheBuilder.newBuilder().expireAfterAccess(600, TimeUnit.SECONDS).build(); 305 306 /** 307 * Used to cache the moved-out regions 308 */ 309 private final Cache<String, MovedRegionInfo> movedRegionInfoCache = CacheBuilder.newBuilder() 310 .expireAfterWrite(movedRegionCacheExpiredTime(), TimeUnit.MILLISECONDS).build(); 311 312 private MemStoreFlusher cacheFlusher; 313 314 private HeapMemoryManager hMemManager; 315 316 /** 317 * Cluster connection to be shared by services. Initialized at server startup and closed when 318 * server shuts down. Clients must never close it explicitly. Clients hosted by this Server should 319 * make use of this clusterConnection rather than create their own; if they create their own, 320 * there is no way for the hosting server to shutdown ongoing client RPCs. 321 */ 322 protected ClusterConnection clusterConnection; 323 324 /** 325 * Go here to get table descriptors. 326 */ 327 protected TableDescriptors tableDescriptors; 328 329 // Replication services. If no replication, this handler will be null. 330 private ReplicationSourceService replicationSourceHandler; 331 private ReplicationSinkService replicationSinkHandler; 332 333 // Compactions 334 private CompactSplit compactSplitThread; 335 336 /** 337 * Map of regions currently being served by this region server. Key is the encoded region name. 338 * All access should be synchronized. 339 */ 340 private final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<>(); 341 /** 342 * Lock for gating access to {@link #onlineRegions}. TODO: If this map is gated by a lock, does it 343 * need to be a ConcurrentHashMap? 344 */ 345 private final ReentrantReadWriteLock onlineRegionsLock = new ReentrantReadWriteLock(); 346 347 /** 348 * Map of encoded region names to the DataNode locations they should be hosted on We store the 349 * value as Address since InetSocketAddress is required by the HDFS API (create() that takes 350 * favored nodes as hints for placing file blocks). We could have used ServerName here as the 351 * value class, but we'd need to convert it to InetSocketAddress at some point before the HDFS API 352 * call, and it seems a bit weird to store ServerName since ServerName refers to RegionServers and 353 * here we really mean DataNode locations. We don't store it as InetSocketAddress here because the 354 * conversion on demand from Address to InetSocketAddress will guarantee the resolution results 355 * will be fresh when we need it. 356 */ 357 private final Map<String, Address[]> regionFavoredNodesMap = new ConcurrentHashMap<>(); 358 359 private LeaseManager leaseManager; 360 361 // Instance of the hbase executor executorService. 362 protected ExecutorService executorService; 363 364 private volatile boolean dataFsOk; 365 private HFileSystem dataFs; 366 private HFileSystem walFs; 367 368 // Set when a report to the master comes back with a message asking us to 369 // shutdown. Also set by call to stop when debugging or running unit tests 370 // of HRegionServer in isolation. 371 private volatile boolean stopped = false; 372 // Only for testing 373 private boolean isShutdownHookInstalled = false; 374 375 // Go down hard. Used if file system becomes unavailable and also in 376 // debugging and unit tests. 377 private AtomicBoolean abortRequested; 378 static final String ABORT_TIMEOUT = "hbase.regionserver.abort.timeout"; 379 // Default abort timeout is 1200 seconds for safe 380 private static final long DEFAULT_ABORT_TIMEOUT = 1200000; 381 // Will run this task when abort timeout 382 static final String ABORT_TIMEOUT_TASK = "hbase.regionserver.abort.timeout.task"; 383 384 // A state before we go into stopped state. At this stage we're closing user 385 // space regions. 386 private boolean stopping = false; 387 private volatile boolean killed = false; 388 private volatile boolean shutDown = false; 389 390 protected final Configuration conf; 391 392 private Path dataRootDir; 393 private Path walRootDir; 394 395 private final int threadWakeFrequency; 396 final int msgInterval; 397 398 private static final String PERIOD_COMPACTION = "hbase.regionserver.compaction.check.period"; 399 private final int compactionCheckFrequency; 400 private static final String PERIOD_FLUSH = "hbase.regionserver.flush.check.period"; 401 private final int flushCheckFrequency; 402 403 // Stub to do region server status calls against the master. 404 private volatile RegionServerStatusService.BlockingInterface rssStub; 405 private volatile LockService.BlockingInterface lockStub; 406 // RPC client. Used to make the stub above that does region server status checking. 407 private RpcClient rpcClient; 408 409 private RpcRetryingCallerFactory rpcRetryingCallerFactory; 410 private RpcControllerFactory rpcControllerFactory; 411 412 private UncaughtExceptionHandler uncaughtExceptionHandler; 413 414 // Info server. Default access so can be used by unit tests. REGIONSERVER 415 // is name of the webapp and the attribute name used stuffing this instance 416 // into web context. 417 protected InfoServer infoServer; 418 private JvmPauseMonitor pauseMonitor; 419 420 private RSSnapshotVerifier rsSnapshotVerifier; 421 422 /** region server process name */ 423 public static final String REGIONSERVER = "regionserver"; 424 425 private MetricsRegionServer metricsRegionServer; 426 MetricsRegionServerWrapperImpl metricsRegionServerImpl; 427 428 /** 429 * ChoreService used to schedule tasks that we want to run periodically 430 */ 431 private ChoreService choreService; 432 433 /** 434 * Check for compactions requests. 435 */ 436 private ScheduledChore compactionChecker; 437 438 /** 439 * Check for flushes 440 */ 441 private ScheduledChore periodicFlusher; 442 443 private volatile WALFactory walFactory; 444 445 private LogRoller walRoller; 446 447 // A thread which calls reportProcedureDone 448 private RemoteProcedureResultReporter procedureResultReporter; 449 450 // flag set after we're done setting up server threads 451 final AtomicBoolean online = new AtomicBoolean(false); 452 453 // zookeeper connection and watcher 454 protected final ZKWatcher zooKeeper; 455 456 // master address tracker 457 private final MasterAddressTracker masterAddressTracker; 458 459 /** 460 * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache 461 * entries. Used for serving ClientMetaService. 462 */ 463 private final MetaRegionLocationCache metaRegionLocationCache; 464 465 // Cluster Status Tracker 466 protected final ClusterStatusTracker clusterStatusTracker; 467 468 // Log Splitting Worker 469 private SplitLogWorker splitLogWorker; 470 471 // A sleeper that sleeps for msgInterval. 472 protected final Sleeper sleeper; 473 474 private final int operationTimeout; 475 private final int shortOperationTimeout; 476 477 // Time to pause if master says 'please hold' 478 private final long retryPauseTime; 479 480 private final RegionServerAccounting regionServerAccounting; 481 482 private NamedQueueServiceChore namedQueueServiceChore = null; 483 484 // Block cache 485 private BlockCache blockCache; 486 // The cache for mob files 487 private MobFileCache mobFileCache; 488 489 /** The health check chore. */ 490 private HealthCheckChore healthCheckChore; 491 492 /** The nonce manager chore. */ 493 private ScheduledChore nonceManagerChore; 494 495 private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); 496 497 /** 498 * The server name the Master sees us as. Its made from the hostname the master passes us, port, 499 * and server startcode. Gets set after registration against Master. 500 */ 501 protected ServerName serverName; 502 503 /** 504 * hostname specified by hostname config 505 */ 506 protected String useThisHostnameInstead; 507 508 /** 509 * @deprecated since 2.4.0 and will be removed in 4.0.0. Use 510 * {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead. 511 * @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a> 512 */ 513 @Deprecated 514 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 515 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 516 "hbase.regionserver.hostname.disable.master.reversedns"; 517 518 /** 519 * HBASE-18226: This config and hbase.unsafe.regionserver.hostname are mutually exclusive. 520 * Exception will be thrown if both are used. 521 */ 522 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 523 final static String UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY = 524 "hbase.unsafe.regionserver.hostname.disable.master.reversedns"; 525 526 /** 527 * This servers startcode. 528 */ 529 protected final long startcode; 530 531 /** 532 * Unique identifier for the cluster we are a part of. 533 */ 534 protected String clusterId; 535 536 // chore for refreshing store files for secondary regions 537 private StorefileRefresherChore storefileRefresher; 538 539 private volatile RegionServerCoprocessorHost rsHost; 540 541 private RegionServerProcedureManagerHost rspmHost; 542 543 private RegionServerRpcQuotaManager rsQuotaManager; 544 private RegionServerSpaceQuotaManager rsSpaceQuotaManager; 545 546 /** 547 * Nonce manager. Nonces are used to make operations like increment and append idempotent in the 548 * case where client doesn't receive the response from a successful operation and retries. We 549 * track the successful ops for some time via a nonce sent by client and handle duplicate 550 * operations (currently, by failing them; in future we might use MVCC to return result). Nonces 551 * are also recovered from WAL during, recovery; however, the caveats (from HBASE-3787) are: - WAL 552 * recovery is optimized, and under high load we won't read nearly nonce-timeout worth of past 553 * records. If we don't read the records, we don't read and recover the nonces. Some WALs within 554 * nonce-timeout at recovery may not even be present due to rolling/cleanup. - There's no WAL 555 * recovery during normal region move, so nonces will not be transfered. We can have separate 556 * additional "Nonce WAL". It will just contain bunch of numbers and won't be flushed on main path 557 * - because WAL itself also contains nonces, if we only flush it before memstore flush, for a 558 * given nonce we will either see it in the WAL (if it was never flushed to disk, it will be part 559 * of recovery), or we'll see it as part of the nonce log (or both occasionally, which doesn't 560 * matter). Nonce log file can be deleted after the latest nonce in it expired. It can also be 561 * recovered during move. 562 */ 563 final ServerNonceManager nonceManager; 564 565 private UserProvider userProvider; 566 567 protected final RSRpcServices rpcServices; 568 569 private CoordinatedStateManager csm; 570 571 /** 572 * Configuration manager is used to register/deregister and notify the configuration observers 573 * when the regionserver is notified that there was a change in the on disk configs. 574 */ 575 protected final ConfigurationManager configurationManager; 576 577 private BrokenStoreFileCleaner brokenStoreFileCleaner; 578 579 private RSMobFileCleanerChore rsMobFileCleanerChore; 580 581 @InterfaceAudience.Private 582 CompactedHFilesDischarger compactedFileDischarger; 583 584 private volatile ThroughputController flushThroughputController; 585 586 private SecureBulkLoadManager secureBulkLoadManager; 587 588 private FileSystemUtilizationChore fsUtilizationChore; 589 590 private final NettyEventLoopGroupConfig eventLoopGroupConfig; 591 592 /** 593 * Provide online slow log responses from ringbuffer 594 */ 595 private NamedQueueRecorder namedQueueRecorder = null; 596 597 private BootstrapNodeManager bootstrapNodeManager; 598 599 /** 600 * True if this RegionServer is coming up in a cluster where there is no Master; means it needs to 601 * just come up and make do without a Master to talk to: e.g. in test or HRegionServer is doing 602 * other than its usual duties: e.g. as an hollowed-out host whose only purpose is as a 603 * Replication-stream sink; see HBASE-18846 for more. TODO: can this replace 604 * {@link #TEST_SKIP_REPORTING_TRANSITION} ? 605 */ 606 private final boolean masterless; 607 private static final String MASTERLESS_CONFIG_NAME = "hbase.masterless"; 608 609 /** regionserver codec list **/ 610 private static final String REGIONSERVER_CODEC = "hbase.regionserver.codecs"; 611 612 // A timer to shutdown the process if abort takes too long 613 private Timer abortMonitor; 614 615 /* 616 * Chore that creates replication marker rows. 617 */ 618 private ReplicationMarkerChore replicationMarkerChore; 619 620 // A timer submit requests to the PrefetchExecutor 621 private PrefetchExecutorNotifier prefetchExecutorNotifier; 622 623 /** 624 * Starts a HRegionServer at the default location. 625 * <p/> 626 * Don't start any services or managers in here in the Constructor. Defer till after we register 627 * with the Master as much as possible. See {@link #startServices}. 628 */ 629 public HRegionServer(final Configuration conf) throws IOException { 630 super("RegionServer"); // thread name 631 final Span span = TraceUtil.createSpan("HRegionServer.cxtor"); 632 try (Scope ignored = span.makeCurrent()) { 633 this.startcode = EnvironmentEdgeManager.currentTime(); 634 this.conf = conf; 635 this.dataFsOk = true; 636 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); 637 this.eventLoopGroupConfig = setupNetty(this.conf); 638 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); 639 HFile.checkHFileVersion(this.conf); 640 checkCodecs(this.conf); 641 this.userProvider = UserProvider.instantiate(conf); 642 FSUtils.setupShortCircuitRead(this.conf); 643 644 // Disable usage of meta replicas in the regionserver 645 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); 646 // Config'ed params 647 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 648 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); 649 this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); 650 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); 651 652 this.sleeper = new Sleeper(this.msgInterval, this); 653 654 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); 655 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; 656 657 this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 658 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); 659 660 this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, 661 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); 662 663 this.retryPauseTime = conf.getLong(HConstants.HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME, 664 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); 665 666 this.abortRequested = new AtomicBoolean(false); 667 this.stopped = false; 668 669 this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); 670 rpcServices = createRpcServices(); 671 useThisHostnameInstead = getUseThisHostnameInstead(conf); 672 673 // if use-ip is enabled, we will use ip to expose Master/RS service for client, 674 // see HBASE-27304 for details. 675 boolean useIp = conf.getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY, 676 HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT); 677 String isaHostName = 678 useIp ? rpcServices.isa.getAddress().getHostAddress() : rpcServices.isa.getHostName(); 679 String hostName = 680 StringUtils.isBlank(useThisHostnameInstead) ? isaHostName : useThisHostnameInstead; 681 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode); 682 683 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); 684 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf, 685 clusterConnection == null ? null : clusterConnection.getConnectionMetrics()); 686 687 // login the zookeeper client principal (if using security) 688 ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, 689 HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); 690 // login the server principal (if using secure Hadoop) 691 login(userProvider, hostName); 692 // init superusers and add the server principal (if using security) 693 // or process owner as default super user. 694 Superusers.initialize(conf); 695 regionServerAccounting = new RegionServerAccounting(conf); 696 697 boolean isMasterNotCarryTable = 698 this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf); 699 700 // no need to instantiate block cache and mob file cache when master not carry table 701 if (!isMasterNotCarryTable) { 702 blockCache = BlockCacheFactory.createBlockCache(conf); 703 mobFileCache = new MobFileCache(conf); 704 } 705 706 rsSnapshotVerifier = new RSSnapshotVerifier(conf); 707 708 uncaughtExceptionHandler = 709 (t, e) -> abort("Uncaught exception in executorService thread " + t.getName(), e); 710 711 initializeFileSystem(); 712 713 this.configurationManager = new ConfigurationManager(); 714 setupSignalHandlers(); 715 716 // Some unit tests don't need a cluster, so no zookeeper at all 717 // Open connection to zookeeper and set primary watcher 718 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, 719 canCreateBaseZNode()); 720 // If no master in cluster, skip trying to track one or look for a cluster status. 721 if (!this.masterless) { 722 if ( 723 conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 724 ) { 725 this.csm = new ZkCoordinatedStateManager(this); 726 } 727 728 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); 729 masterAddressTracker.start(); 730 731 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); 732 clusterStatusTracker.start(); 733 } else { 734 masterAddressTracker = null; 735 clusterStatusTracker = null; 736 } 737 this.rpcServices.start(zooKeeper); 738 this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper); 739 // This violates 'no starting stuff in Constructor' but Master depends on the below chore 740 // and executor being created and takes a different startup route. Lots of overlap between HRS 741 // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super 742 // Master expects Constructor to put up web servers. Ugh. 743 // class HRS. TODO. 744 int choreServiceInitialSize = 745 conf.getInt(CHORE_SERVICE_INITIAL_POOL_SIZE, DEFAULT_CHORE_SERVICE_INITIAL_POOL_SIZE); 746 this.choreService = new ChoreService(getName(), choreServiceInitialSize, true); 747 this.executorService = new ExecutorService(getName()); 748 putUpWebUI(); 749 span.setStatus(StatusCode.OK); 750 } catch (Throwable t) { 751 // Make sure we log the exception. HRegionServer is often started via reflection and the 752 // cause of failed startup is lost. 753 TraceUtil.setError(span, t); 754 LOG.error("Failed construction RegionServer", t); 755 throw t; 756 } finally { 757 span.end(); 758 } 759 } 760 761 // HMaster should override this method to load the specific config for master 762 protected String getUseThisHostnameInstead(Configuration conf) throws IOException { 763 String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); 764 if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) { 765 if (!StringUtils.isBlank(hostname)) { 766 String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " 767 + UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " 768 + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " 769 + UNSAFE_RS_HOSTNAME_KEY + " is used"; 770 throw new IOException(msg); 771 } else { 772 return rpcServices.isa.getHostName(); 773 } 774 } else { 775 return hostname; 776 } 777 } 778 779 private void setupSignalHandlers() { 780 if (!SystemUtils.IS_OS_WINDOWS) { 781 HBasePlatformDependent.handle("HUP", (number, name) -> { 782 try { 783 updateConfiguration(); 784 } catch (IOException e) { 785 LOG.error("Problem while reloading configuration", e); 786 } 787 }); 788 } 789 } 790 791 private static NettyEventLoopGroupConfig setupNetty(Configuration conf) { 792 // Initialize netty event loop group at start as we may use it for rpc server, rpc client & WAL. 793 NettyEventLoopGroupConfig nelgc = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup"); 794 NettyRpcClientConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 795 NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, nelgc.group(), nelgc.clientChannelClass()); 796 return nelgc; 797 } 798 799 private void initializeFileSystem() throws IOException { 800 // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase 801 // checksum verification enabled, then automatically switch off hdfs checksum verification. 802 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); 803 String walDirUri = CommonFSUtils.getDirUri(this.conf, 804 new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR)))); 805 // set WAL's uri 806 if (walDirUri != null) { 807 CommonFSUtils.setFsDefault(this.conf, walDirUri); 808 } 809 // init the WALFs 810 this.walFs = new HFileSystem(this.conf, useHBaseChecksum); 811 this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); 812 // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else 813 // underlying hadoop hdfs accessors will be going against wrong filesystem 814 // (unless all is set to defaults). 815 String rootDirUri = 816 CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR))); 817 if (rootDirUri != null) { 818 CommonFSUtils.setFsDefault(this.conf, rootDirUri); 819 } 820 // init the filesystem 821 this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); 822 this.dataRootDir = CommonFSUtils.getRootDir(this.conf); 823 this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir, 824 !canUpdateTableDescriptor(), cacheTableDescriptor()); 825 } 826 827 protected void login(UserProvider user, String host) throws IOException { 828 user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, 829 SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); 830 } 831 832 /** 833 * Wait for an active Master. See override in Master superclass for how it is used. 834 */ 835 protected void waitForMasterActive() { 836 } 837 838 protected String getProcessName() { 839 return REGIONSERVER; 840 } 841 842 protected boolean canCreateBaseZNode() { 843 return this.masterless; 844 } 845 846 protected boolean canUpdateTableDescriptor() { 847 return false; 848 } 849 850 protected boolean cacheTableDescriptor() { 851 return false; 852 } 853 854 protected RSRpcServices createRpcServices() throws IOException { 855 return new RSRpcServices(this); 856 } 857 858 protected void configureInfoServer() { 859 infoServer.addUnprivilegedServlet("rs-status", "/rs-status", RSStatusServlet.class); 860 infoServer.setAttribute(REGIONSERVER, this); 861 } 862 863 protected Class<? extends HttpServlet> getDumpServlet() { 864 return RSDumpServlet.class; 865 } 866 867 /** 868 * Used by {@link RSDumpServlet} to generate debugging information. 869 */ 870 public void dumpRowLocks(final PrintWriter out) { 871 StringBuilder sb = new StringBuilder(); 872 for (HRegion region : getRegions()) { 873 if (region.getLockedRows().size() > 0) { 874 for (HRegion.RowLockContext rowLockContext : region.getLockedRows().values()) { 875 sb.setLength(0); 876 sb.append(region.getTableDescriptor().getTableName()).append(",") 877 .append(region.getRegionInfo().getEncodedName()).append(","); 878 sb.append(rowLockContext.toString()); 879 out.println(sb); 880 } 881 } 882 } 883 } 884 885 @Override 886 public boolean registerService(com.google.protobuf.Service instance) { 887 /* 888 * No stacking of instances is allowed for a single executorService name 889 */ 890 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType(); 891 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc); 892 if (coprocessorServiceHandlers.containsKey(serviceName)) { 893 LOG.error("Coprocessor executorService " + serviceName 894 + " already registered, rejecting request from " + instance); 895 return false; 896 } 897 898 coprocessorServiceHandlers.put(serviceName, instance); 899 if (LOG.isDebugEnabled()) { 900 LOG.debug( 901 "Registered regionserver coprocessor executorService: executorService=" + serviceName); 902 } 903 return true; 904 } 905 906 protected ClusterConnection createClusterConnection() throws IOException { 907 // Create a cluster connection that when appropriate, can short-circuit and go directly to the 908 // local server if the request is to the local server bypassing RPC. Can be used for both local 909 // and remote invocations. 910 return ServerConnectionUtils.createShortCircuitConnection(conf, userProvider.getCurrent(), 911 serverName, rpcServices, rpcServices, new RegionServerRegistry(this)); 912 } 913 914 /** 915 * Run test on configured codecs to make sure supporting libs are in place. 916 * @param c configuration object 917 * @throws IOException if compression test fails for any regionserver codec 918 */ 919 private static void checkCodecs(final Configuration c) throws IOException { 920 // check to see if the codec list is available: 921 String[] codecs = c.getStrings(REGIONSERVER_CODEC, (String[]) null); 922 if (codecs == null) { 923 return; 924 } 925 for (String codec : codecs) { 926 if (!CompressionTest.testCompression(codec)) { 927 throw new IOException( 928 "Compression codec " + codec + " not supported, aborting RS construction"); 929 } 930 } 931 } 932 933 public String getClusterId() { 934 return this.clusterId; 935 } 936 937 /** 938 * Setup our cluster connection if not already initialized. 939 */ 940 protected synchronized void setupClusterConnection() throws IOException { 941 if (clusterConnection == null) { 942 clusterConnection = createClusterConnection(); 943 } 944 } 945 946 /** 947 * All initialization needed before we go register with Master.<br> 948 * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br> 949 * In here we just put up the RpcServer, setup Connection, and ZooKeeper. 950 */ 951 private void preRegistrationInitialization() { 952 final Span span = TraceUtil.createSpan("HRegionServer.preRegistrationInitialization"); 953 try (Scope ignored = span.makeCurrent()) { 954 initializeZooKeeper(); 955 setupClusterConnection(); 956 if (!(this instanceof HMaster)) { 957 bootstrapNodeManager = new BootstrapNodeManager(clusterConnection, masterAddressTracker); 958 } 959 // Setup RPC client for master communication 960 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, 961 new InetSocketAddress(this.rpcServices.isa.getAddress(), 0), 962 clusterConnection.getConnectionMetrics(), Collections.emptyMap()); 963 span.setStatus(StatusCode.OK); 964 } catch (Throwable t) { 965 // Call stop if error or process will stick around for ever since server 966 // puts up non-daemon threads. 967 TraceUtil.setError(span, t); 968 this.rpcServices.stop(); 969 abort("Initialization of RS failed. Hence aborting RS.", t); 970 } finally { 971 span.end(); 972 } 973 } 974 975 /** 976 * Bring up connection to zk ensemble and then wait until a master for this cluster and then after 977 * that, wait until cluster 'up' flag has been set. This is the order in which master does things. 978 * <p> 979 * Finally open long-living server short-circuit connection. 980 */ 981 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", 982 justification = "cluster Id znode read would give us correct response") 983 private void initializeZooKeeper() throws IOException, InterruptedException { 984 // Nothing to do in here if no Master in the mix. 985 if (this.masterless) { 986 return; 987 } 988 989 // Create the master address tracker, register with zk, and start it. Then 990 // block until a master is available. No point in starting up if no master 991 // running. 992 blockAndCheckIfStopped(this.masterAddressTracker); 993 994 // Wait on cluster being up. Master will set this flag up in zookeeper 995 // when ready. 996 blockAndCheckIfStopped(this.clusterStatusTracker); 997 998 // If we are HMaster then the cluster id should have already been set. 999 if (clusterId == null) { 1000 // Retrieve clusterId 1001 // Since cluster status is now up 1002 // ID should have already been set by HMaster 1003 try { 1004 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper); 1005 if (clusterId == null) { 1006 this.abort("Cluster ID has not been set"); 1007 } 1008 LOG.info("ClusterId : " + clusterId); 1009 } catch (KeeperException e) { 1010 this.abort("Failed to retrieve Cluster ID", e); 1011 } 1012 } 1013 1014 waitForMasterActive(); 1015 if (isStopped() || isAborted()) { 1016 return; // No need for further initialization 1017 } 1018 1019 // watch for snapshots and other procedures 1020 try { 1021 rspmHost = new RegionServerProcedureManagerHost(); 1022 rspmHost.loadProcedures(conf); 1023 rspmHost.initialize(this); 1024 } catch (KeeperException e) { 1025 this.abort("Failed to reach coordination cluster when creating procedure handler.", e); 1026 } 1027 } 1028 1029 /** 1030 * Utilty method to wait indefinitely on a znode availability while checking if the region server 1031 * is shut down 1032 * @param tracker znode tracker to use 1033 * @throws IOException any IO exception, plus if the RS is stopped 1034 * @throws InterruptedException if the waiting thread is interrupted 1035 */ 1036 private void blockAndCheckIfStopped(ZKNodeTracker tracker) 1037 throws IOException, InterruptedException { 1038 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) { 1039 if (this.stopped) { 1040 throw new IOException("Received the shutdown message while waiting."); 1041 } 1042 } 1043 } 1044 1045 /** Returns True if the cluster is up. */ 1046 @Override 1047 public boolean isClusterUp() { 1048 return this.masterless 1049 || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); 1050 } 1051 1052 private void initializeReplicationMarkerChore() { 1053 boolean replicationMarkerEnabled = 1054 conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); 1055 // If replication or replication marker is not enabled then return immediately. 1056 if (replicationMarkerEnabled) { 1057 int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, 1058 REPLICATION_MARKER_CHORE_DURATION_DEFAULT); 1059 replicationMarkerChore = new ReplicationMarkerChore(this, this, period); 1060 } 1061 } 1062 1063 /** 1064 * The HRegionServer sticks in this loop until closed. 1065 */ 1066 @Override 1067 public void run() { 1068 if (isStopped()) { 1069 LOG.info("Skipping run; stopped"); 1070 return; 1071 } 1072 try { 1073 installShutdownHook(); 1074 // Do pre-registration initializations; zookeeper, lease threads, etc. 1075 preRegistrationInitialization(); 1076 } catch (Throwable e) { 1077 abort("Fatal exception during initialization", e); 1078 } 1079 1080 try { 1081 if (!isStopped() && !isAborted()) { 1082 // Initialize the RegionServerCoprocessorHost now that our ephemeral 1083 // node was created, in case any coprocessors want to use ZooKeeper 1084 this.rsHost = new RegionServerCoprocessorHost(this, this.conf); 1085 1086 // Try and register with the Master; tell it we are here. Break if server is stopped or 1087 // the clusterup flag is down or hdfs went wacky. Once registered successfully, go ahead and 1088 // start up all Services. Use RetryCounter to get backoff in case Master is struggling to 1089 // come up. 1090 LOG.debug("About to register with Master."); 1091 TraceUtil.trace(() -> { 1092 RetryCounterFactory rcf = 1093 new RetryCounterFactory(Integer.MAX_VALUE, this.sleeper.getPeriod(), 1000 * 60 * 5); 1094 RetryCounter rc = rcf.create(); 1095 while (keepLooping()) { 1096 RegionServerStartupResponse w = reportForDuty(); 1097 if (w == null) { 1098 long sleepTime = rc.getBackoffTimeAndIncrementAttempts(); 1099 LOG.warn("reportForDuty failed; sleeping {} ms and then retrying.", sleepTime); 1100 this.sleeper.sleep(sleepTime); 1101 } else { 1102 handleReportForDutyResponse(w); 1103 break; 1104 } 1105 } 1106 }, "HRegionServer.registerWithMaster"); 1107 } 1108 1109 if (!isStopped() && isHealthy()) { 1110 TraceUtil.trace(() -> { 1111 // start the snapshot handler and other procedure handlers, 1112 // since the server is ready to run 1113 if (this.rspmHost != null) { 1114 this.rspmHost.start(); 1115 } 1116 // Start the Quota Manager 1117 if (this.rsQuotaManager != null) { 1118 rsQuotaManager.start(getRpcServer().getScheduler()); 1119 } 1120 if (this.rsSpaceQuotaManager != null) { 1121 this.rsSpaceQuotaManager.start(); 1122 } 1123 }, "HRegionServer.startup"); 1124 } 1125 1126 // We registered with the Master. Go into run mode. 1127 long lastMsg = EnvironmentEdgeManager.currentTime(); 1128 long oldRequestCount = -1; 1129 // The main run loop. 1130 while (!isStopped() && isHealthy()) { 1131 if (!isClusterUp()) { 1132 if (onlineRegions.isEmpty()) { 1133 stop("Exiting; cluster shutdown set and not carrying any regions"); 1134 } else if (!this.stopping) { 1135 this.stopping = true; 1136 LOG.info("Closing user regions"); 1137 closeUserRegions(this.abortRequested.get()); 1138 } else { 1139 boolean allUserRegionsOffline = areAllUserRegionsOffline(); 1140 if (allUserRegionsOffline) { 1141 // Set stopped if no more write requests tp meta tables 1142 // since last time we went around the loop. Any open 1143 // meta regions will be closed on our way out. 1144 if (oldRequestCount == getWriteRequestCount()) { 1145 stop("Stopped; only catalog regions remaining online"); 1146 break; 1147 } 1148 oldRequestCount = getWriteRequestCount(); 1149 } else { 1150 // Make sure all regions have been closed -- some regions may 1151 // have not got it because we were splitting at the time of 1152 // the call to closeUserRegions. 1153 closeUserRegions(this.abortRequested.get()); 1154 } 1155 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString()); 1156 } 1157 } 1158 long now = EnvironmentEdgeManager.currentTime(); 1159 if ((now - lastMsg) >= msgInterval) { 1160 tryRegionServerReport(lastMsg, now); 1161 lastMsg = EnvironmentEdgeManager.currentTime(); 1162 } 1163 if (!isStopped() && !isAborted()) { 1164 this.sleeper.sleep(); 1165 } 1166 } // for 1167 } catch (Throwable t) { 1168 if (!rpcServices.checkOOME(t)) { 1169 String prefix = t instanceof YouAreDeadException ? "" : "Unhandled: "; 1170 abort(prefix + t.getMessage(), t); 1171 } 1172 } 1173 1174 final Span span = TraceUtil.createSpan("HRegionServer exiting main loop"); 1175 try (Scope ignored = span.makeCurrent()) { 1176 if (this.leaseManager != null) { 1177 this.leaseManager.closeAfterLeasesExpire(); 1178 } 1179 if (this.splitLogWorker != null) { 1180 splitLogWorker.stop(); 1181 } 1182 if (this.infoServer != null) { 1183 LOG.info("Stopping infoServer"); 1184 try { 1185 this.infoServer.stop(); 1186 } catch (Exception e) { 1187 LOG.error("Failed to stop infoServer", e); 1188 } 1189 } 1190 // Send cache a shutdown. 1191 if (blockCache != null) { 1192 blockCache.shutdown(); 1193 } 1194 if (mobFileCache != null) { 1195 mobFileCache.shutdown(); 1196 } 1197 1198 // Send interrupts to wake up threads if sleeping so they notice shutdown. 1199 // TODO: Should we check they are alive? If OOME could have exited already 1200 if (this.hMemManager != null) { 1201 this.hMemManager.stop(); 1202 } 1203 if (this.cacheFlusher != null) { 1204 this.cacheFlusher.interruptIfNecessary(); 1205 } 1206 if (this.compactSplitThread != null) { 1207 this.compactSplitThread.interruptIfNecessary(); 1208 } 1209 1210 // Stop the snapshot and other procedure handlers, forcefully killing all running tasks 1211 if (rspmHost != null) { 1212 rspmHost.stop(this.abortRequested.get() || this.killed); 1213 } 1214 1215 if (this.killed) { 1216 // Just skip out w/o closing regions. Used when testing. 1217 } else if (abortRequested.get()) { 1218 if (this.dataFsOk) { 1219 closeUserRegions(abortRequested.get()); // Don't leave any open file handles 1220 } 1221 LOG.info("aborting server " + this.serverName); 1222 } else { 1223 closeUserRegions(abortRequested.get()); 1224 LOG.info("stopping server " + this.serverName); 1225 } 1226 1227 if (this.clusterConnection != null && !clusterConnection.isClosed()) { 1228 try { 1229 this.clusterConnection.close(); 1230 } catch (IOException e) { 1231 // Although the {@link Closeable} interface throws an {@link 1232 // IOException}, in reality, the implementation would never do that. 1233 LOG.warn("Attempt to close server's short circuit ClusterConnection failed.", e); 1234 } 1235 } 1236 1237 // Closing the compactSplit thread before closing meta regions 1238 if (!this.killed && containsMetaTableRegions()) { 1239 if (!abortRequested.get() || this.dataFsOk) { 1240 if (this.compactSplitThread != null) { 1241 this.compactSplitThread.join(); 1242 this.compactSplitThread = null; 1243 } 1244 closeMetaTableRegions(abortRequested.get()); 1245 } 1246 } 1247 1248 if (!this.killed && this.dataFsOk) { 1249 waitOnAllRegionsToClose(abortRequested.get()); 1250 LOG.info("stopping server " + this.serverName + "; all regions closed."); 1251 } 1252 1253 // Stop the quota manager 1254 if (rsQuotaManager != null) { 1255 rsQuotaManager.stop(); 1256 } 1257 if (rsSpaceQuotaManager != null) { 1258 rsSpaceQuotaManager.stop(); 1259 rsSpaceQuotaManager = null; 1260 } 1261 1262 // flag may be changed when closing regions throws exception. 1263 if (this.dataFsOk) { 1264 shutdownWAL(!abortRequested.get()); 1265 } 1266 1267 // Make sure the proxy is down. 1268 if (this.rssStub != null) { 1269 this.rssStub = null; 1270 } 1271 if (this.lockStub != null) { 1272 this.lockStub = null; 1273 } 1274 if (this.rpcClient != null) { 1275 this.rpcClient.close(); 1276 } 1277 if (this.leaseManager != null) { 1278 this.leaseManager.close(); 1279 } 1280 if (this.pauseMonitor != null) { 1281 this.pauseMonitor.stop(); 1282 } 1283 1284 if (!killed) { 1285 stopServiceThreads(); 1286 } 1287 1288 if (this.rpcServices != null) { 1289 this.rpcServices.stop(); 1290 } 1291 1292 try { 1293 deleteMyEphemeralNode(); 1294 } catch (KeeperException.NoNodeException nn) { 1295 // pass 1296 } catch (KeeperException e) { 1297 LOG.warn("Failed deleting my ephemeral node", e); 1298 } 1299 // We may have failed to delete the znode at the previous step, but 1300 // we delete the file anyway: a second attempt to delete the znode is likely to fail again. 1301 ZNodeClearer.deleteMyEphemeralNodeOnDisk(); 1302 1303 if (this.zooKeeper != null) { 1304 this.zooKeeper.close(); 1305 } 1306 this.shutDown = true; 1307 LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed."); 1308 span.setStatus(StatusCode.OK); 1309 } finally { 1310 span.end(); 1311 } 1312 } 1313 1314 /** 1315 * This method is called when HMaster and HRegionServer are started. Please see to HBASE-26977 for 1316 * details. 1317 */ 1318 private void installShutdownHook() { 1319 ShutdownHook.install(conf, dataFs, this, Thread.currentThread()); 1320 isShutdownHookInstalled = true; 1321 } 1322 1323 /** 1324 * This method is used for testing. 1325 */ 1326 public boolean isShutdownHookInstalled() { 1327 return isShutdownHookInstalled; 1328 } 1329 1330 private boolean containsMetaTableRegions() { 1331 return onlineRegions.containsKey(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); 1332 } 1333 1334 private boolean areAllUserRegionsOffline() { 1335 if (getNumberOfOnlineRegions() > 2) { 1336 return false; 1337 } 1338 boolean allUserRegionsOffline = true; 1339 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1340 if (!e.getValue().getRegionInfo().isMetaRegion()) { 1341 allUserRegionsOffline = false; 1342 break; 1343 } 1344 } 1345 return allUserRegionsOffline; 1346 } 1347 1348 /** Returns Current write count for all online regions. */ 1349 private long getWriteRequestCount() { 1350 long writeCount = 0; 1351 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1352 writeCount += e.getValue().getWriteRequestsCount(); 1353 } 1354 return writeCount; 1355 } 1356 1357 @InterfaceAudience.Private 1358 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 1359 throws IOException { 1360 RegionServerStatusService.BlockingInterface rss = rssStub; 1361 if (rss == null) { 1362 // the current server could be stopping. 1363 return; 1364 } 1365 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime); 1366 final Span span = TraceUtil.createSpan("HRegionServer.tryRegionServerReport"); 1367 try (Scope ignored = span.makeCurrent()) { 1368 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder(); 1369 request.setServer(ProtobufUtil.toServerName(this.serverName)); 1370 request.setLoad(sl); 1371 rss.regionServerReport(null, request.build()); 1372 span.setStatus(StatusCode.OK); 1373 } catch (ServiceException se) { 1374 IOException ioe = ProtobufUtil.getRemoteException(se); 1375 if (ioe instanceof YouAreDeadException) { 1376 // This will be caught and handled as a fatal error in run() 1377 TraceUtil.setError(span, ioe); 1378 throw ioe; 1379 } 1380 if (rssStub == rss) { 1381 rssStub = null; 1382 } 1383 TraceUtil.setError(span, se); 1384 // Couldn't connect to the master, get location from zk and reconnect 1385 // Method blocks until new master is found or we are stopped 1386 createRegionServerStatusStub(true); 1387 } finally { 1388 span.end(); 1389 } 1390 } 1391 1392 /** 1393 * Reports the given map of Regions and their size on the filesystem to the active Master. 1394 * @param regionSizeStore The store containing region sizes 1395 * @return false if FileSystemUtilizationChore should pause reporting to master. true otherwise 1396 */ 1397 public boolean reportRegionSizesForQuotas(RegionSizeStore regionSizeStore) { 1398 RegionServerStatusService.BlockingInterface rss = rssStub; 1399 if (rss == null) { 1400 // the current server could be stopping. 1401 LOG.trace("Skipping Region size report to HMaster as stub is null"); 1402 return true; 1403 } 1404 try { 1405 buildReportAndSend(rss, regionSizeStore); 1406 } catch (ServiceException se) { 1407 IOException ioe = ProtobufUtil.getRemoteException(se); 1408 if (ioe instanceof PleaseHoldException) { 1409 LOG.trace("Failed to report region sizes to Master because it is initializing." 1410 + " This will be retried.", ioe); 1411 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 1412 return true; 1413 } 1414 if (rssStub == rss) { 1415 rssStub = null; 1416 } 1417 createRegionServerStatusStub(true); 1418 if (ioe instanceof DoNotRetryIOException) { 1419 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe; 1420 if (doNotRetryEx.getCause() != null) { 1421 Throwable t = doNotRetryEx.getCause(); 1422 if (t instanceof UnsupportedOperationException) { 1423 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying"); 1424 return false; 1425 } 1426 } 1427 } 1428 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe); 1429 } 1430 return true; 1431 } 1432 1433 /** 1434 * Builds the region size report and sends it to the master. Upon successful sending of the 1435 * report, the region sizes that were sent are marked as sent. 1436 * @param rss The stub to send to the Master 1437 * @param regionSizeStore The store containing region sizes 1438 */ 1439 private void buildReportAndSend(RegionServerStatusService.BlockingInterface rss, 1440 RegionSizeStore regionSizeStore) throws ServiceException { 1441 RegionSpaceUseReportRequest request = 1442 buildRegionSpaceUseReportRequest(Objects.requireNonNull(regionSizeStore)); 1443 rss.reportRegionSpaceUse(null, request); 1444 // Record the number of size reports sent 1445 if (metricsRegionServer != null) { 1446 metricsRegionServer.incrementNumRegionSizeReportsSent(regionSizeStore.size()); 1447 } 1448 } 1449 1450 /** 1451 * Builds a {@link RegionSpaceUseReportRequest} protobuf message from the region size map. 1452 * @param regionSizes The size in bytes of regions 1453 * @return The corresponding protocol buffer message. 1454 */ 1455 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(RegionSizeStore regionSizes) { 1456 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder(); 1457 for (Entry<RegionInfo, RegionSize> entry : regionSizes) { 1458 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue().getSize())); 1459 } 1460 return request.build(); 1461 } 1462 1463 /** 1464 * Converts a pair of {@link RegionInfo} and {@code long} into a {@link RegionSpaceUse} protobuf 1465 * message. 1466 * @param regionInfo The RegionInfo 1467 * @param sizeInBytes The size in bytes of the Region 1468 * @return The protocol buffer 1469 */ 1470 RegionSpaceUse convertRegionSize(RegionInfo regionInfo, Long sizeInBytes) { 1471 return RegionSpaceUse.newBuilder() 1472 .setRegionInfo(ProtobufUtil.toRegionInfo(Objects.requireNonNull(regionInfo))) 1473 .setRegionSize(Objects.requireNonNull(sizeInBytes)).build(); 1474 } 1475 1476 private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime) 1477 throws IOException { 1478 // We're getting the MetricsRegionServerWrapper here because the wrapper computes requests 1479 // per second, and other metrics As long as metrics are part of ServerLoad it's best to use 1480 // the wrapper to compute those numbers in one place. 1481 // In the long term most of these should be moved off of ServerLoad and the heart beat. 1482 // Instead they should be stored in an HBase table so that external visibility into HBase is 1483 // improved; Additionally the load balancer will be able to take advantage of a more complete 1484 // history. 1485 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper(); 1486 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 1487 long usedMemory = -1L; 1488 long maxMemory = -1L; 1489 final MemoryUsage usage = MemorySizeUtil.safeGetHeapMemoryUsage(); 1490 if (usage != null) { 1491 usedMemory = usage.getUsed(); 1492 maxMemory = usage.getMax(); 1493 } 1494 1495 ClusterStatusProtos.ServerLoad.Builder serverLoad = ClusterStatusProtos.ServerLoad.newBuilder(); 1496 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond()); 1497 serverLoad.setTotalNumberOfRequests(regionServerWrapper.getTotalRequestCount()); 1498 serverLoad.setUsedHeapMB((int) (usedMemory / 1024 / 1024)); 1499 serverLoad.setMaxHeapMB((int) (maxMemory / 1024 / 1024)); 1500 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors(); 1501 Coprocessor.Builder coprocessorBuilder = Coprocessor.newBuilder(); 1502 for (String coprocessor : coprocessors) { 1503 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1504 } 1505 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder(); 1506 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder(); 1507 for (HRegion region : regions) { 1508 if (region.getCoprocessorHost() != null) { 1509 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors(); 1510 for (String regionCoprocessor : regionCoprocessors) { 1511 serverLoad.addCoprocessors(coprocessorBuilder.setName(regionCoprocessor).build()); 1512 } 1513 } 1514 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier)); 1515 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost() 1516 .getCoprocessors()) { 1517 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build()); 1518 } 1519 } 1520 computeIfPersistentBucketCache(bc -> { 1521 bc.getRegionCachedInfo().forEach((regionName, prefetchSize) -> { 1522 serverLoad.putRegionCachedInfo(regionName, roundSize(prefetchSize, unitMB)); 1523 }); 1524 }); 1525 serverLoad.setReportStartTime(reportStartTime); 1526 serverLoad.setReportEndTime(reportEndTime); 1527 if (this.infoServer != null) { 1528 serverLoad.setInfoServerPort(this.infoServer.getPort()); 1529 } else { 1530 serverLoad.setInfoServerPort(-1); 1531 } 1532 MetricsUserAggregateSource userSource = 1533 metricsRegionServer.getMetricsUserAggregate().getSource(); 1534 if (userSource != null) { 1535 Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources(); 1536 for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) { 1537 serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue())); 1538 } 1539 } 1540 1541 // for the replicationLoad purpose. Only need to get from one executorService 1542 // either source or sink will get the same info 1543 ReplicationSourceService rsources = getReplicationSourceService(); 1544 if (rsources != null) { 1545 // always refresh first to get the latest value 1546 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); 1547 if (rLoad != null) { 1548 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); 1549 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad 1550 .getReplicationLoadSourceEntries()) { 1551 serverLoad.addReplLoadSource(rLS); 1552 } 1553 1554 } 1555 } 1556 1557 TaskMonitor.get().getTasks().forEach(task -> serverLoad.addTasks(ClusterStatusProtos.ServerTask 1558 .newBuilder().setDescription(task.getDescription()) 1559 .setStatus(task.getStatus() != null ? task.getStatus() : "") 1560 .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) 1561 .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); 1562 1563 return serverLoad.build(); 1564 } 1565 1566 private String getOnlineRegionsAsPrintableString() { 1567 StringBuilder sb = new StringBuilder(); 1568 for (Region r : this.onlineRegions.values()) { 1569 if (sb.length() > 0) { 1570 sb.append(", "); 1571 } 1572 sb.append(r.getRegionInfo().getEncodedName()); 1573 } 1574 return sb.toString(); 1575 } 1576 1577 /** 1578 * Wait on regions close. 1579 */ 1580 private void waitOnAllRegionsToClose(final boolean abort) { 1581 // Wait till all regions are closed before going out. 1582 int lastCount = -1; 1583 long previousLogTime = 0; 1584 Set<String> closedRegions = new HashSet<>(); 1585 boolean interrupted = false; 1586 try { 1587 while (!onlineRegions.isEmpty()) { 1588 int count = getNumberOfOnlineRegions(); 1589 // Only print a message if the count of regions has changed. 1590 if (count != lastCount) { 1591 // Log every second at most 1592 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 1593 previousLogTime = EnvironmentEdgeManager.currentTime(); 1594 lastCount = count; 1595 LOG.info("Waiting on " + count + " regions to close"); 1596 // Only print out regions still closing if a small number else will 1597 // swamp the log. 1598 if (count < 10 && LOG.isDebugEnabled()) { 1599 LOG.debug("Online Regions=" + this.onlineRegions); 1600 } 1601 } 1602 } 1603 // Ensure all user regions have been sent a close. Use this to 1604 // protect against the case where an open comes in after we start the 1605 // iterator of onlineRegions to close all user regions. 1606 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 1607 RegionInfo hri = e.getValue().getRegionInfo(); 1608 if ( 1609 !this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes()) 1610 && !closedRegions.contains(hri.getEncodedName()) 1611 ) { 1612 closedRegions.add(hri.getEncodedName()); 1613 // Don't update zk with this close transition; pass false. 1614 closeRegionIgnoreErrors(hri, abort); 1615 } 1616 } 1617 // No regions in RIT, we could stop waiting now. 1618 if (this.regionsInTransitionInRS.isEmpty()) { 1619 if (!onlineRegions.isEmpty()) { 1620 LOG.info("We were exiting though online regions are not empty," 1621 + " because some regions failed closing"); 1622 } 1623 break; 1624 } else { 1625 LOG.debug("Waiting on {}", this.regionsInTransitionInRS.keySet().stream() 1626 .map(e -> Bytes.toString(e)).collect(Collectors.joining(", "))); 1627 } 1628 if (sleepInterrupted(200)) { 1629 interrupted = true; 1630 } 1631 } 1632 } finally { 1633 if (interrupted) { 1634 Thread.currentThread().interrupt(); 1635 } 1636 } 1637 } 1638 1639 private static boolean sleepInterrupted(long millis) { 1640 boolean interrupted = false; 1641 try { 1642 Thread.sleep(millis); 1643 } catch (InterruptedException e) { 1644 LOG.warn("Interrupted while sleeping"); 1645 interrupted = true; 1646 } 1647 return interrupted; 1648 } 1649 1650 private void shutdownWAL(final boolean close) { 1651 if (this.walFactory != null) { 1652 try { 1653 if (close) { 1654 walFactory.close(); 1655 } else { 1656 walFactory.shutdown(); 1657 } 1658 } catch (Throwable e) { 1659 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 1660 LOG.error("Shutdown / close of WAL failed: " + e); 1661 LOG.debug("Shutdown / close exception details:", e); 1662 } 1663 } 1664 } 1665 1666 /** 1667 * get NamedQueue Provider to add different logs to ringbuffer 1668 */ 1669 public NamedQueueRecorder getNamedQueueRecorder() { 1670 return this.namedQueueRecorder; 1671 } 1672 1673 /* 1674 * Run init. Sets up wal and starts up all server threads. 1675 * @param c Extra configuration. 1676 */ 1677 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 1678 throws IOException { 1679 try { 1680 boolean updateRootDir = false; 1681 for (NameStringPair e : c.getMapEntriesList()) { 1682 String key = e.getName(); 1683 // The hostname the master sees us as. 1684 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { 1685 String hostnameFromMasterPOV = e.getValue(); 1686 this.serverName = ServerName.valueOf(hostnameFromMasterPOV, 1687 rpcServices.getSocketAddress().getPort(), this.startcode); 1688 String expectedHostName = rpcServices.getSocketAddress().getHostName(); 1689 // if Master use-ip is enabled, RegionServer use-ip will be enabled by default even if it 1690 // is set to disable. so we will use the ip of the RegionServer to compare with the 1691 // hostname passed by the Master, see HBASE-27304 for details. 1692 if ( 1693 StringUtils.isBlank(useThisHostnameInstead) && getActiveMaster().isPresent() 1694 && InetAddresses.isInetAddress(getActiveMaster().get().getHostname()) 1695 ) { 1696 expectedHostName = rpcServices.getSocketAddress().getAddress().getHostAddress(); 1697 } 1698 boolean isHostnameConsist = StringUtils.isBlank(useThisHostnameInstead) 1699 ? hostnameFromMasterPOV.equals(expectedHostName) 1700 : hostnameFromMasterPOV.equals(useThisHostnameInstead); 1701 if (!isHostnameConsist) { 1702 String msg = "Master passed us a different hostname to use; was=" 1703 + (StringUtils.isBlank(useThisHostnameInstead) 1704 ? rpcServices.getSocketAddress().getHostName() 1705 : this.useThisHostnameInstead) 1706 + ", but now=" + hostnameFromMasterPOV; 1707 LOG.error(msg); 1708 throw new IOException(msg); 1709 } 1710 continue; 1711 } 1712 1713 String value = e.getValue(); 1714 if (key.equals(HConstants.HBASE_DIR)) { 1715 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) { 1716 updateRootDir = true; 1717 } 1718 } 1719 1720 if (LOG.isDebugEnabled()) { 1721 LOG.debug("Config from master: " + key + "=" + value); 1722 } 1723 this.conf.set(key, value); 1724 } 1725 // Set our ephemeral znode up in zookeeper now we have a name. 1726 createMyEphemeralNode(); 1727 1728 if (updateRootDir) { 1729 // initialize file system by the config fs.defaultFS and hbase.rootdir from master 1730 initializeFileSystem(); 1731 } 1732 1733 // hack! Maps DFSClient => RegionServer for logs. HDFS made this 1734 // config param for task trackers, but we can piggyback off of it. 1735 if (this.conf.get("mapreduce.task.attempt.id") == null) { 1736 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" + this.serverName.toString()); 1737 } 1738 1739 // Save it in a file, this will allow to see if we crash 1740 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath()); 1741 1742 // This call sets up an initialized replication and WAL. Later we start it up. 1743 setupWALAndReplication(); 1744 // Init in here rather than in constructor after thread name has been set 1745 final MetricsTable metricsTable = 1746 new MetricsTable(new MetricsTableWrapperAggregateImpl(this)); 1747 this.metricsRegionServerImpl = new MetricsRegionServerWrapperImpl(this); 1748 this.metricsRegionServer = 1749 new MetricsRegionServer(metricsRegionServerImpl, conf, metricsTable); 1750 // Now that we have a metrics source, start the pause monitor 1751 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource()); 1752 pauseMonitor.start(); 1753 1754 // There is a rare case where we do NOT want services to start. Check config. 1755 if (getConfiguration().getBoolean("hbase.regionserver.workers", true)) { 1756 startServices(); 1757 } 1758 // In here we start up the replication Service. Above we initialized it. TODO. Reconcile. 1759 // or make sense of it. 1760 startReplicationService(); 1761 1762 // Set up ZK 1763 LOG.info( 1764 "Serving as " + this.serverName + ", RpcServer on " + rpcServices.isa + ", sessionid=0x" 1765 + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); 1766 1767 // Wake up anyone waiting for this server to online 1768 synchronized (online) { 1769 online.set(true); 1770 online.notifyAll(); 1771 } 1772 } catch (Throwable e) { 1773 stop("Failed initialization"); 1774 throw convertThrowableToIOE(cleanup(e, "Failed init"), "Region server startup failed"); 1775 } finally { 1776 sleeper.skipSleepCycle(); 1777 } 1778 } 1779 1780 protected void initializeMemStoreChunkCreator() { 1781 if (MemStoreLAB.isEnabled(conf)) { 1782 // MSLAB is enabled. So initialize MemStoreChunkPool 1783 // By this time, the MemstoreFlusher is already initialized. We can get the global limits from 1784 // it. 1785 Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf); 1786 long globalMemStoreSize = pair.getFirst(); 1787 boolean offheap = this.regionServerAccounting.isOffheap(); 1788 // When off heap memstore in use, take full area for chunk pool. 1789 float poolSizePercentage = offheap 1790 ? 1.0F 1791 : conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); 1792 float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, 1793 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); 1794 int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); 1795 float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY, 1796 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1797 // init the chunkCreator 1798 ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, 1799 initialCountPercentage, this.hMemManager, indexChunkSizePercent); 1800 } 1801 } 1802 1803 private void startHeapMemoryManager() { 1804 if (this.blockCache != null) { 1805 this.hMemManager = 1806 new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting); 1807 this.hMemManager.start(getChoreService()); 1808 } 1809 } 1810 1811 private void createMyEphemeralNode() throws KeeperException { 1812 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder(); 1813 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1); 1814 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo()); 1815 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray()); 1816 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(), data); 1817 } 1818 1819 private void deleteMyEphemeralNode() throws KeeperException { 1820 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath()); 1821 } 1822 1823 @Override 1824 public RegionServerAccounting getRegionServerAccounting() { 1825 return regionServerAccounting; 1826 } 1827 1828 // Round the size with KB or MB. 1829 // A trick here is that if the sizeInBytes is less than sizeUnit, we will round the size to 1 1830 // instead of 0 if it is not 0, to avoid some schedulers think the region has no data. See 1831 // HBASE-26340 for more details on why this is important. 1832 private static int roundSize(long sizeInByte, int sizeUnit) { 1833 if (sizeInByte == 0) { 1834 return 0; 1835 } else if (sizeInByte < sizeUnit) { 1836 return 1; 1837 } else { 1838 return (int) Math.min(sizeInByte / sizeUnit, Integer.MAX_VALUE); 1839 } 1840 } 1841 1842 private void computeIfPersistentBucketCache(Consumer<BucketCache> computation) { 1843 if (blockCache instanceof CombinedBlockCache) { 1844 BlockCache l2 = ((CombinedBlockCache) blockCache).getSecondLevelCache(); 1845 if (l2 instanceof BucketCache && ((BucketCache) l2).isCachePersistent()) { 1846 computation.accept((BucketCache) l2); 1847 } 1848 } 1849 } 1850 1851 /** 1852 * @param r Region to get RegionLoad for. 1853 * @param regionLoadBldr the RegionLoad.Builder, can be null 1854 * @param regionSpecifier the RegionSpecifier.Builder, can be null 1855 * @return RegionLoad instance. 1856 */ 1857 RegionLoad createRegionLoad(final HRegion r, RegionLoad.Builder regionLoadBldr, 1858 RegionSpecifier.Builder regionSpecifier) throws IOException { 1859 byte[] name = r.getRegionInfo().getRegionName(); 1860 String regionEncodedName = r.getRegionInfo().getEncodedName(); 1861 int stores = 0; 1862 int storefiles = 0; 1863 int storeRefCount = 0; 1864 int maxCompactedStoreFileRefCount = 0; 1865 long storeUncompressedSize = 0L; 1866 long storefileSize = 0L; 1867 long storefileIndexSize = 0L; 1868 long rootLevelIndexSize = 0L; 1869 long totalStaticIndexSize = 0L; 1870 long totalStaticBloomSize = 0L; 1871 long totalCompactingKVs = 0L; 1872 long currentCompactedKVs = 0L; 1873 long totalRegionSize = 0L; 1874 List<HStore> storeList = r.getStores(); 1875 stores += storeList.size(); 1876 for (HStore store : storeList) { 1877 storefiles += store.getStorefilesCount(); 1878 int currentStoreRefCount = store.getStoreRefCount(); 1879 storeRefCount += currentStoreRefCount; 1880 int currentMaxCompactedStoreFileRefCount = store.getMaxCompactedStoreFileRefCount(); 1881 maxCompactedStoreFileRefCount = 1882 Math.max(maxCompactedStoreFileRefCount, currentMaxCompactedStoreFileRefCount); 1883 storeUncompressedSize += store.getStoreSizeUncompressed(); 1884 storefileSize += store.getStorefilesSize(); 1885 totalRegionSize += store.getHFilesSize(); 1886 // TODO: storefileIndexSizeKB is same with rootLevelIndexSizeKB? 1887 storefileIndexSize += store.getStorefilesRootLevelIndexSize(); 1888 CompactionProgress progress = store.getCompactionProgress(); 1889 if (progress != null) { 1890 totalCompactingKVs += progress.getTotalCompactingKVs(); 1891 currentCompactedKVs += progress.currentCompactedKVs; 1892 } 1893 rootLevelIndexSize += store.getStorefilesRootLevelIndexSize(); 1894 totalStaticIndexSize += store.getTotalStaticIndexSize(); 1895 totalStaticBloomSize += store.getTotalStaticBloomSize(); 1896 } 1897 1898 int memstoreSizeMB = roundSize(r.getMemStoreDataSize(), unitMB); 1899 int storeUncompressedSizeMB = roundSize(storeUncompressedSize, unitMB); 1900 int storefileSizeMB = roundSize(storefileSize, unitMB); 1901 int storefileIndexSizeKB = roundSize(storefileIndexSize, unitKB); 1902 int rootLevelIndexSizeKB = roundSize(rootLevelIndexSize, unitKB); 1903 int totalStaticIndexSizeKB = roundSize(totalStaticIndexSize, unitKB); 1904 int totalStaticBloomSizeKB = roundSize(totalStaticBloomSize, unitKB); 1905 int regionSizeMB = roundSize(totalRegionSize, unitMB); 1906 final MutableFloat currentRegionCachedRatio = new MutableFloat(0.0f); 1907 computeIfPersistentBucketCache(bc -> { 1908 if (bc.getRegionCachedInfo().containsKey(regionEncodedName)) { 1909 currentRegionCachedRatio.setValue(regionSizeMB == 0 1910 ? 0.0f 1911 : (float) roundSize(bc.getRegionCachedInfo().get(regionEncodedName), unitMB) 1912 / regionSizeMB); 1913 } 1914 }); 1915 1916 HDFSBlocksDistribution hdfsBd = r.getHDFSBlocksDistribution(); 1917 float dataLocality = hdfsBd.getBlockLocalityIndex(serverName.getHostname()); 1918 float dataLocalityForSsd = hdfsBd.getBlockLocalityIndexForSsd(serverName.getHostname()); 1919 long blocksTotalWeight = hdfsBd.getUniqueBlocksTotalWeight(); 1920 long blocksLocalWeight = hdfsBd.getBlocksLocalWeight(serverName.getHostname()); 1921 long blocksLocalWithSsdWeight = hdfsBd.getBlocksLocalWithSsdWeight(serverName.getHostname()); 1922 if (regionLoadBldr == null) { 1923 regionLoadBldr = RegionLoad.newBuilder(); 1924 } 1925 if (regionSpecifier == null) { 1926 regionSpecifier = RegionSpecifier.newBuilder(); 1927 } 1928 1929 regionSpecifier.setType(RegionSpecifierType.REGION_NAME); 1930 regionSpecifier.setValue(UnsafeByteOperations.unsafeWrap(name)); 1931 regionLoadBldr.setRegionSpecifier(regionSpecifier.build()).setStores(stores) 1932 .setStorefiles(storefiles).setStoreRefCount(storeRefCount) 1933 .setMaxCompactedStoreFileRefCount(maxCompactedStoreFileRefCount) 1934 .setStoreUncompressedSizeMB(storeUncompressedSizeMB).setStorefileSizeMB(storefileSizeMB) 1935 .setMemStoreSizeMB(memstoreSizeMB).setStorefileIndexSizeKB(storefileIndexSizeKB) 1936 .setRootIndexSizeKB(rootLevelIndexSizeKB).setTotalStaticIndexSizeKB(totalStaticIndexSizeKB) 1937 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB) 1938 .setReadRequestsCount(r.getReadRequestsCount()) 1939 .setFilteredReadRequestsCount(r.getFilteredReadRequestsCount()) 1940 .setWriteRequestsCount(r.getWriteRequestsCount()).setTotalCompactingKVs(totalCompactingKVs) 1941 .setCurrentCompactedKVs(currentCompactedKVs).setDataLocality(dataLocality) 1942 .setDataLocalityForSsd(dataLocalityForSsd).setBlocksLocalWeight(blocksLocalWeight) 1943 .setBlocksLocalWithSsdWeight(blocksLocalWithSsdWeight).setBlocksTotalWeight(blocksTotalWeight) 1944 .setCompactionState(ProtobufUtil.createCompactionStateForRegionLoad(r.getCompactionState())) 1945 .setLastMajorCompactionTs(r.getOldestHfileTs(true)).setRegionSizeMB(regionSizeMB) 1946 .setCurrentRegionCachedRatio(currentRegionCachedRatio.floatValue()); 1947 r.setCompleteSequenceId(regionLoadBldr); 1948 return regionLoadBldr.build(); 1949 } 1950 1951 private UserLoad createUserLoad(String user, MetricsUserSource userSource) { 1952 UserLoad.Builder userLoadBldr = UserLoad.newBuilder(); 1953 userLoadBldr.setUserName(user); 1954 userSource.getClientMetrics().values().stream() 1955 .map(clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder() 1956 .setHostName(clientMetrics.getHostName()) 1957 .setWriteRequestsCount(clientMetrics.getWriteRequestsCount()) 1958 .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests()) 1959 .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build()) 1960 .forEach(userLoadBldr::addClientMetrics); 1961 return userLoadBldr.build(); 1962 } 1963 1964 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException { 1965 HRegion r = onlineRegions.get(encodedRegionName); 1966 return r != null ? createRegionLoad(r, null, null) : null; 1967 } 1968 1969 /** 1970 * Inner class that runs on a long period checking if regions need compaction. 1971 */ 1972 private static class CompactionChecker extends ScheduledChore { 1973 private final HRegionServer instance; 1974 private final int majorCompactPriority; 1975 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE; 1976 // Iteration is 1-based rather than 0-based so we don't check for compaction 1977 // immediately upon region server startup 1978 private long iteration = 1; 1979 1980 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { 1981 super("CompactionChecker", stopper, sleepTime); 1982 this.instance = h; 1983 LOG.info(this.getName() + " runs every " + Duration.ofMillis(sleepTime)); 1984 1985 /* 1986 * MajorCompactPriority is configurable. If not set, the compaction will use default priority. 1987 */ 1988 this.majorCompactPriority = this.instance.conf 1989 .getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); 1990 } 1991 1992 @Override 1993 protected void chore() { 1994 for (HRegion hr : this.instance.onlineRegions.values()) { 1995 // If region is read only or compaction is disabled at table level, there's no need to 1996 // iterate through region's stores 1997 if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) { 1998 continue; 1999 } 2000 for (HStore s : hr.stores.values()) { 2001 try { 2002 long multiplier = s.getCompactionCheckMultiplier(); 2003 assert multiplier > 0; 2004 if (iteration % multiplier != 0) { 2005 continue; 2006 } 2007 if (s.needsCompaction()) { 2008 // Queue a compaction. Will recognize if major is needed. 2009 this.instance.compactSplitThread.requestSystemCompaction(hr, s, 2010 getName() + " requests compaction"); 2011 } else if (s.shouldPerformMajorCompaction()) { 2012 s.triggerMajorCompaction(); 2013 if ( 2014 majorCompactPriority == DEFAULT_PRIORITY 2015 || majorCompactPriority > hr.getCompactPriority() 2016 ) { 2017 this.instance.compactSplitThread.requestCompaction(hr, s, 2018 getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, 2019 CompactionLifeCycleTracker.DUMMY, null); 2020 } else { 2021 this.instance.compactSplitThread.requestCompaction(hr, s, 2022 getName() + " requests major compaction; use configured priority", 2023 this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); 2024 } 2025 } 2026 } catch (IOException e) { 2027 LOG.warn("Failed major compaction check on " + hr, e); 2028 } 2029 } 2030 } 2031 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); 2032 } 2033 } 2034 2035 private static class PeriodicMemStoreFlusher extends ScheduledChore { 2036 private final HRegionServer server; 2037 private final static int RANGE_OF_DELAY = 5 * 60; // 5 min in seconds 2038 private final static int MIN_DELAY_TIME = 0; // millisec 2039 private final long rangeOfDelayMs; 2040 2041 PeriodicMemStoreFlusher(int cacheFlushInterval, final HRegionServer server) { 2042 super("MemstoreFlusherChore", server, cacheFlushInterval); 2043 this.server = server; 2044 2045 final long configuredRangeOfDelay = server.getConfiguration() 2046 .getInt("hbase.regionserver.periodicmemstoreflusher.rangeofdelayseconds", RANGE_OF_DELAY); 2047 this.rangeOfDelayMs = TimeUnit.SECONDS.toMillis(configuredRangeOfDelay); 2048 } 2049 2050 @Override 2051 protected void chore() { 2052 final StringBuilder whyFlush = new StringBuilder(); 2053 for (HRegion r : this.server.onlineRegions.values()) { 2054 if (r == null) { 2055 continue; 2056 } 2057 if (r.shouldFlush(whyFlush)) { 2058 FlushRequester requester = server.getFlushRequester(); 2059 if (requester != null) { 2060 long delay = ThreadLocalRandom.current().nextLong(rangeOfDelayMs) + MIN_DELAY_TIME; 2061 // Throttle the flushes by putting a delay. If we don't throttle, and there 2062 // is a balanced write-load on the regions in a table, we might end up 2063 // overwhelming the filesystem with too many flushes at once. 2064 if (requester.requestDelayedFlush(r, delay)) { 2065 LOG.info("{} requesting flush of {} because {} after random delay {} ms", getName(), 2066 r.getRegionInfo().getRegionNameAsString(), whyFlush.toString(), delay); 2067 } 2068 } 2069 } 2070 } 2071 } 2072 } 2073 2074 /** 2075 * Report the status of the server. A server is online once all the startup is completed (setting 2076 * up filesystem, starting executorService threads, etc.). This method is designed mostly to be 2077 * useful in tests. 2078 * @return true if online, false if not. 2079 */ 2080 public boolean isOnline() { 2081 return online.get(); 2082 } 2083 2084 /** 2085 * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to 2086 * be hooked up to WAL. 2087 */ 2088 private void setupWALAndReplication() throws IOException { 2089 WALFactory factory = new WALFactory(conf, serverName.toString(), (Server) this); 2090 // TODO Replication make assumptions here based on the default filesystem impl 2091 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 2092 String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); 2093 2094 Path logDir = new Path(walRootDir, logName); 2095 LOG.debug("logDir={}", logDir); 2096 if (this.walFs.exists(logDir)) { 2097 throw new RegionServerRunningException( 2098 "Region server has already created directory at " + this.serverName.toString()); 2099 } 2100 // Always create wal directory as now we need this when master restarts to find out the live 2101 // region servers. 2102 if (!this.walFs.mkdirs(logDir)) { 2103 throw new IOException("Can not create wal directory " + logDir); 2104 } 2105 // Instantiate replication if replication enabled. Pass it the log directories. 2106 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); 2107 2108 WALActionsListener walEventListener = getWALEventTrackerListener(conf); 2109 if (walEventListener != null && factory.getWALProvider() != null) { 2110 factory.getWALProvider().addWALActionsListener(walEventListener); 2111 } 2112 this.walFactory = factory; 2113 } 2114 2115 private WALActionsListener getWALEventTrackerListener(Configuration conf) { 2116 if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { 2117 WALEventTrackerListener listener = 2118 new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); 2119 return listener; 2120 } 2121 return null; 2122 } 2123 2124 /** 2125 * Start up replication source and sink handlers. 2126 */ 2127 private void startReplicationService() throws IOException { 2128 if ( 2129 this.replicationSourceHandler == this.replicationSinkHandler 2130 && this.replicationSourceHandler != null 2131 ) { 2132 this.replicationSourceHandler.startReplicationService(); 2133 } else { 2134 if (this.replicationSourceHandler != null) { 2135 this.replicationSourceHandler.startReplicationService(); 2136 } 2137 if (this.replicationSinkHandler != null) { 2138 this.replicationSinkHandler.startReplicationService(); 2139 } 2140 } 2141 } 2142 2143 /** Returns Master address tracker instance. */ 2144 public MasterAddressTracker getMasterAddressTracker() { 2145 return this.masterAddressTracker; 2146 } 2147 2148 /** 2149 * Start maintenance Threads, Server, Worker and lease checker threads. Start all threads we need 2150 * to run. This is called after we've successfully registered with the Master. Install an 2151 * UncaughtExceptionHandler that calls abort of RegionServer if we get an unhandled exception. We 2152 * cannot set the handler on all threads. Server's internal Listener thread is off limits. For 2153 * Server, if an OOME, it waits a while then retries. Meantime, a flush or a compaction that tries 2154 * to run should trigger same critical condition and the shutdown will run. On its way out, this 2155 * server will shut down Server. Leases are sort of inbetween. It has an internal thread that 2156 * while it inherits from Chore, it keeps its own internal stop mechanism so needs to be stopped 2157 * by this hosting server. Worker logs the exception and exits. 2158 */ 2159 private void startServices() throws IOException { 2160 if (!isStopped() && !isAborted()) { 2161 initializeThreads(); 2162 } 2163 this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf, clusterConnection); 2164 this.secureBulkLoadManager.start(); 2165 2166 // Health checker thread. 2167 if (isHealthCheckerConfigured()) { 2168 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ, 2169 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY); 2170 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration()); 2171 } 2172 2173 this.walRoller = new LogRoller(this); 2174 this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf); 2175 this.procedureResultReporter = new RemoteProcedureResultReporter(this); 2176 2177 // Create the CompactedFileDischarger chore executorService. This chore helps to 2178 // remove the compacted files that will no longer be used in reads. 2179 // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to 2180 // 2 mins so that compacted files can be archived before the TTLCleaner runs 2181 int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); 2182 this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, this, this); 2183 choreService.scheduleChore(compactedFileDischarger); 2184 2185 // Start executor services 2186 final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3); 2187 executorService.startExecutorService(executorService.new ExecutorConfig() 2188 .setExecutorType(ExecutorType.RS_OPEN_REGION).setCorePoolSize(openRegionThreads)); 2189 final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1); 2190 executorService.startExecutorService(executorService.new ExecutorConfig() 2191 .setExecutorType(ExecutorType.RS_OPEN_META).setCorePoolSize(openMetaThreads)); 2192 final int openPriorityRegionThreads = 2193 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3); 2194 executorService.startExecutorService( 2195 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_OPEN_PRIORITY_REGION) 2196 .setCorePoolSize(openPriorityRegionThreads)); 2197 final int closeRegionThreads = 2198 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3); 2199 executorService.startExecutorService(executorService.new ExecutorConfig() 2200 .setExecutorType(ExecutorType.RS_CLOSE_REGION).setCorePoolSize(closeRegionThreads)); 2201 final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1); 2202 executorService.startExecutorService(executorService.new ExecutorConfig() 2203 .setExecutorType(ExecutorType.RS_CLOSE_META).setCorePoolSize(closeMetaThreads)); 2204 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) { 2205 final int storeScannerParallelSeekThreads = 2206 conf.getInt("hbase.storescanner.parallel.seek.threads", 10); 2207 executorService.startExecutorService( 2208 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_PARALLEL_SEEK) 2209 .setCorePoolSize(storeScannerParallelSeekThreads).setAllowCoreThreadTimeout(true)); 2210 } 2211 final int logReplayOpsThreads = 2212 conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); 2213 executorService.startExecutorService( 2214 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_LOG_REPLAY_OPS) 2215 .setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true)); 2216 // Start the threads for compacted files discharger 2217 final int compactionDischargerThreads = 2218 conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10); 2219 executorService.startExecutorService(executorService.new ExecutorConfig() 2220 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER) 2221 .setCorePoolSize(compactionDischargerThreads)); 2222 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { 2223 final int regionReplicaFlushThreads = 2224 conf.getInt("hbase.regionserver.region.replica.flusher.threads", 2225 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)); 2226 executorService.startExecutorService(executorService.new ExecutorConfig() 2227 .setExecutorType(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS) 2228 .setCorePoolSize(regionReplicaFlushThreads)); 2229 } 2230 final int refreshPeerThreads = 2231 conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2); 2232 executorService.startExecutorService(executorService.new ExecutorConfig() 2233 .setExecutorType(ExecutorType.RS_REFRESH_PEER).setCorePoolSize(refreshPeerThreads)); 2234 2235 final int switchRpcThrottleThreads = 2236 conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1); 2237 executorService.startExecutorService( 2238 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SWITCH_RPC_THROTTLE) 2239 .setCorePoolSize(switchRpcThrottleThreads)); 2240 final int claimReplicationQueueThreads = 2241 conf.getInt("hbase.regionserver.executor.claim.replication.queue.threads", 1); 2242 executorService.startExecutorService( 2243 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_CLAIM_REPLICATION_QUEUE) 2244 .setCorePoolSize(claimReplicationQueueThreads)); 2245 final int rsSnapshotOperationThreads = 2246 conf.getInt("hbase.regionserver.executor.snapshot.operations.threads", 3); 2247 executorService.startExecutorService( 2248 executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OPERATIONS) 2249 .setCorePoolSize(rsSnapshotOperationThreads)); 2250 final int rsFlushOperationThreads = 2251 conf.getInt("hbase.regionserver.executor.flush.operations.threads", 3); 2252 executorService.startExecutorService(executorService.new ExecutorConfig() 2253 .setExecutorType(ExecutorType.RS_FLUSH_OPERATIONS).setCorePoolSize(rsFlushOperationThreads)); 2254 2255 Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", 2256 uncaughtExceptionHandler); 2257 if (this.cacheFlusher != null) { 2258 this.cacheFlusher.start(uncaughtExceptionHandler); 2259 } 2260 Threads.setDaemonThreadRunning(this.procedureResultReporter, 2261 getName() + ".procedureResultReporter", uncaughtExceptionHandler); 2262 2263 if (this.compactionChecker != null) { 2264 choreService.scheduleChore(compactionChecker); 2265 } 2266 if (this.periodicFlusher != null) { 2267 choreService.scheduleChore(periodicFlusher); 2268 } 2269 if (this.healthCheckChore != null) { 2270 choreService.scheduleChore(healthCheckChore); 2271 } 2272 if (this.nonceManagerChore != null) { 2273 choreService.scheduleChore(nonceManagerChore); 2274 } 2275 if (this.storefileRefresher != null) { 2276 choreService.scheduleChore(storefileRefresher); 2277 } 2278 if (this.fsUtilizationChore != null) { 2279 choreService.scheduleChore(fsUtilizationChore); 2280 } 2281 if (this.namedQueueServiceChore != null) { 2282 choreService.scheduleChore(namedQueueServiceChore); 2283 } 2284 if (this.brokenStoreFileCleaner != null) { 2285 choreService.scheduleChore(brokenStoreFileCleaner); 2286 } 2287 if (this.rsMobFileCleanerChore != null) { 2288 choreService.scheduleChore(rsMobFileCleanerChore); 2289 } 2290 if (replicationMarkerChore != null) { 2291 LOG.info("Starting replication marker chore"); 2292 choreService.scheduleChore(replicationMarkerChore); 2293 } 2294 2295 // Leases is not a Thread. Internally it runs a daemon thread. If it gets 2296 // an unhandled exception, it will just exit. 2297 Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", 2298 uncaughtExceptionHandler); 2299 2300 // Create the log splitting worker and start it 2301 // set a smaller retries to fast fail otherwise splitlogworker could be blocked for 2302 // quite a while inside Connection layer. The worker won't be available for other 2303 // tasks even after current task is preempted after a split task times out. 2304 Configuration sinkConf = HBaseConfiguration.create(conf); 2305 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2306 conf.getInt("hbase.log.replay.retries.number", 8)); // 8 retries take about 23 seconds 2307 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 2308 conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds 2309 sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); 2310 if ( 2311 this.csm != null 2312 && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 2313 ) { 2314 // SplitLogWorker needs csm. If none, don't start this. 2315 this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); 2316 splitLogWorker.start(); 2317 LOG.debug("SplitLogWorker started"); 2318 } 2319 2320 // Memstore services. 2321 startHeapMemoryManager(); 2322 // Call it after starting HeapMemoryManager. 2323 initializeMemStoreChunkCreator(); 2324 } 2325 2326 private void initializeThreads() { 2327 // Cache flushing thread. 2328 this.cacheFlusher = new MemStoreFlusher(conf, this); 2329 2330 // Compaction thread 2331 this.compactSplitThread = new CompactSplit(this); 2332 2333 // Prefetch Notifier 2334 this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf); 2335 2336 // Background thread to check for compactions; needed if region has not gotten updates 2337 // in a while. It will take care of not checking too frequently on store-by-store basis. 2338 this.compactionChecker = new CompactionChecker(this, this.compactionCheckFrequency, this); 2339 this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this); 2340 this.leaseManager = new LeaseManager(this.threadWakeFrequency); 2341 2342 final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, 2343 HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); 2344 final boolean walEventTrackerEnabled = 2345 conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); 2346 2347 if (isSlowLogTableEnabled || walEventTrackerEnabled) { 2348 // default chore duration: 10 min 2349 // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property 2350 final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, 2351 DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); 2352 2353 final int namedQueueChoreDuration = 2354 conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); 2355 // Considering min of slowLogChoreDuration and namedQueueChoreDuration 2356 int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); 2357 2358 namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, 2359 this.namedQueueRecorder, this.getConnection()); 2360 } 2361 2362 if (this.nonceManager != null) { 2363 // Create the scheduled chore that cleans up nonces. 2364 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this); 2365 } 2366 2367 // Setup the Quota Manager 2368 rsQuotaManager = new RegionServerRpcQuotaManager(this); 2369 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this); 2370 2371 if (QuotaUtil.isQuotaEnabled(conf)) { 2372 this.fsUtilizationChore = new FileSystemUtilizationChore(this); 2373 } 2374 2375 boolean onlyMetaRefresh = false; 2376 int storefileRefreshPeriod = 2377 conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 2378 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2379 if (storefileRefreshPeriod == 0) { 2380 storefileRefreshPeriod = 2381 conf.getInt(StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD, 2382 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); 2383 onlyMetaRefresh = true; 2384 } 2385 if (storefileRefreshPeriod > 0) { 2386 this.storefileRefresher = 2387 new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); 2388 } 2389 2390 int brokenStoreFileCleanerPeriod = 2391 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, 2392 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); 2393 int brokenStoreFileCleanerDelay = 2394 conf.getInt(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, 2395 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); 2396 double brokenStoreFileCleanerDelayJitter = 2397 conf.getDouble(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, 2398 BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); 2399 double jitterRate = 2400 (ThreadLocalRandom.current().nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; 2401 long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); 2402 this.brokenStoreFileCleaner = 2403 new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), 2404 brokenStoreFileCleanerPeriod, this, conf, this); 2405 2406 this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); 2407 2408 registerConfigurationObservers(); 2409 initializeReplicationMarkerChore(); 2410 } 2411 2412 private void registerConfigurationObservers() { 2413 // Register Replication if possible, as now we support recreating replication peer storage, for 2414 // migrating across different replication peer storages online 2415 if (replicationSourceHandler instanceof ConfigurationObserver) { 2416 configurationManager.registerObserver((ConfigurationObserver) replicationSourceHandler); 2417 } 2418 if ( 2419 replicationSourceHandler != replicationSinkHandler 2420 && replicationSinkHandler instanceof ConfigurationObserver 2421 ) { 2422 configurationManager.registerObserver((ConfigurationObserver) replicationSinkHandler); 2423 } 2424 // Registering the compactSplitThread object with the ConfigurationManager. 2425 configurationManager.registerObserver(this.compactSplitThread); 2426 configurationManager.registerObserver(this.cacheFlusher); 2427 configurationManager.registerObserver(this.rpcServices); 2428 configurationManager.registerObserver(this.prefetchExecutorNotifier); 2429 configurationManager.registerObserver(this); 2430 } 2431 2432 /** 2433 * Puts up the webui. 2434 */ 2435 private void putUpWebUI() throws IOException { 2436 int port = 2437 this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT); 2438 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); 2439 2440 if (this instanceof HMaster) { 2441 port = conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT); 2442 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); 2443 } 2444 // -1 is for disabling info server 2445 if (port < 0) { 2446 return; 2447 } 2448 2449 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { 2450 String msg = "Failed to start http info server. Address " + addr 2451 + " does not belong to this host. Correct configuration parameter: " 2452 + "hbase.regionserver.info.bindAddress"; 2453 LOG.error(msg); 2454 throw new IOException(msg); 2455 } 2456 // check if auto port bind enabled 2457 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); 2458 while (true) { 2459 try { 2460 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); 2461 infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet()); 2462 configureInfoServer(); 2463 this.infoServer.start(); 2464 break; 2465 } catch (BindException e) { 2466 if (!auto) { 2467 // auto bind disabled throw BindException 2468 LOG.error("Failed binding http info server to port: " + port); 2469 throw e; 2470 } 2471 // auto bind enabled, try to use another port 2472 LOG.info("Failed binding http info server to port: " + port); 2473 port++; 2474 LOG.info("Retry starting http info server with port: " + port); 2475 } 2476 } 2477 port = this.infoServer.getPort(); 2478 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); 2479 int masterInfoPort = 2480 conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT); 2481 conf.setInt("hbase.master.info.port.orig", masterInfoPort); 2482 conf.setInt(HConstants.MASTER_INFO_PORT, port); 2483 } 2484 2485 /* 2486 * Verify that server is healthy 2487 */ 2488 private boolean isHealthy() { 2489 if (!dataFsOk) { 2490 // File system problem 2491 return false; 2492 } 2493 // Verify that all threads are alive 2494 boolean healthy = (this.leaseManager == null || this.leaseManager.isAlive()) 2495 && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) 2496 && (this.walRoller == null || this.walRoller.isAlive()) 2497 && (this.compactionChecker == null || this.compactionChecker.isScheduled()) 2498 && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); 2499 if (!healthy) { 2500 stop("One or more threads are no longer alive -- stop"); 2501 } 2502 return healthy; 2503 } 2504 2505 @Override 2506 public List<WAL> getWALs() { 2507 return walFactory.getWALs(); 2508 } 2509 2510 @Override 2511 public WAL getWAL(RegionInfo regionInfo) throws IOException { 2512 try { 2513 WAL wal = walFactory.getWAL(regionInfo); 2514 if (this.walRoller != null) { 2515 this.walRoller.addWAL(wal); 2516 } 2517 return wal; 2518 } catch (FailedCloseWALAfterInitializedErrorException ex) { 2519 // see HBASE-21751 for details 2520 abort("WAL can not clean up after init failed", ex); 2521 throw ex; 2522 } 2523 } 2524 2525 public LogRoller getWalRoller() { 2526 return walRoller; 2527 } 2528 2529 WALFactory getWalFactory() { 2530 return walFactory; 2531 } 2532 2533 @Override 2534 public Connection getConnection() { 2535 return getClusterConnection(); 2536 } 2537 2538 @Override 2539 public ClusterConnection getClusterConnection() { 2540 return this.clusterConnection; 2541 } 2542 2543 @Override 2544 public void stop(final String msg) { 2545 stop(msg, false, RpcServer.getRequestUser().orElse(null)); 2546 } 2547 2548 /** 2549 * Stops the regionserver. 2550 * @param msg Status message 2551 * @param force True if this is a regionserver abort 2552 * @param user The user executing the stop request, or null if no user is associated 2553 */ 2554 public void stop(final String msg, final boolean force, final User user) { 2555 if (!this.stopped) { 2556 LOG.info("***** STOPPING region server '" + this + "' *****"); 2557 if (this.rsHost != null) { 2558 // when forced via abort don't allow CPs to override 2559 try { 2560 this.rsHost.preStop(msg, user); 2561 } catch (IOException ioe) { 2562 if (!force) { 2563 LOG.warn("The region server did not stop", ioe); 2564 return; 2565 } 2566 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe); 2567 } 2568 } 2569 this.stopped = true; 2570 LOG.info("STOPPED: " + msg); 2571 // Wakes run() if it is sleeping 2572 sleeper.skipSleepCycle(); 2573 } 2574 } 2575 2576 public void waitForServerOnline() { 2577 while (!isStopped() && !isOnline()) { 2578 synchronized (online) { 2579 try { 2580 online.wait(msgInterval); 2581 } catch (InterruptedException ie) { 2582 Thread.currentThread().interrupt(); 2583 break; 2584 } 2585 } 2586 } 2587 } 2588 2589 @Override 2590 public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException { 2591 HRegion r = context.getRegion(); 2592 long openProcId = context.getOpenProcId(); 2593 long masterSystemTime = context.getMasterSystemTime(); 2594 rpcServices.checkOpen(); 2595 LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}", 2596 r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime); 2597 // Do checks to see if we need to compact (references or too many files) 2598 for (HStore s : r.stores.values()) { 2599 if (s.hasReferences() || s.needsCompaction()) { 2600 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); 2601 } 2602 } 2603 long openSeqNum = r.getOpenSeqNum(); 2604 if (openSeqNum == HConstants.NO_SEQNUM) { 2605 // If we opened a region, we should have read some sequence number from it. 2606 LOG.error( 2607 "No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString()); 2608 openSeqNum = 0; 2609 } 2610 2611 // Notify master 2612 if ( 2613 !reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED, 2614 openSeqNum, openProcId, masterSystemTime, r.getRegionInfo())) 2615 ) { 2616 throw new IOException( 2617 "Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString()); 2618 } 2619 2620 triggerFlushInPrimaryRegion(r); 2621 2622 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString()); 2623 } 2624 2625 /** 2626 * Helper method for use in tests. Skip the region transition report when there's no master around 2627 * to receive it. 2628 */ 2629 private boolean skipReportingTransition(final RegionStateTransitionContext context) { 2630 final TransitionCode code = context.getCode(); 2631 final long openSeqNum = context.getOpenSeqNum(); 2632 long masterSystemTime = context.getMasterSystemTime(); 2633 final RegionInfo[] hris = context.getHris(); 2634 2635 if (code == TransitionCode.OPENED) { 2636 Preconditions.checkArgument(hris != null && hris.length == 1); 2637 if (hris[0].isMetaRegion()) { 2638 LOG.warn( 2639 "meta table location is stored in master local store, so we can not skip reporting"); 2640 return false; 2641 } else { 2642 try { 2643 MetaTableAccessor.updateRegionLocation(clusterConnection, hris[0], serverName, openSeqNum, 2644 masterSystemTime); 2645 } catch (IOException e) { 2646 LOG.info("Failed to update meta", e); 2647 return false; 2648 } 2649 } 2650 } 2651 return true; 2652 } 2653 2654 private ReportRegionStateTransitionRequest 2655 createReportRegionStateTransitionRequest(final RegionStateTransitionContext context) { 2656 final TransitionCode code = context.getCode(); 2657 final long openSeqNum = context.getOpenSeqNum(); 2658 final RegionInfo[] hris = context.getHris(); 2659 final long[] procIds = context.getProcIds(); 2660 2661 ReportRegionStateTransitionRequest.Builder builder = 2662 ReportRegionStateTransitionRequest.newBuilder(); 2663 builder.setServer(ProtobufUtil.toServerName(serverName)); 2664 RegionStateTransition.Builder transition = builder.addTransitionBuilder(); 2665 transition.setTransitionCode(code); 2666 if (code == TransitionCode.OPENED && openSeqNum >= 0) { 2667 transition.setOpenSeqNum(openSeqNum); 2668 } 2669 for (RegionInfo hri : hris) { 2670 transition.addRegionInfo(ProtobufUtil.toRegionInfo(hri)); 2671 } 2672 for (long procId : procIds) { 2673 transition.addProcId(procId); 2674 } 2675 2676 return builder.build(); 2677 } 2678 2679 @Override 2680 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) { 2681 if (TEST_SKIP_REPORTING_TRANSITION) { 2682 return skipReportingTransition(context); 2683 } 2684 final ReportRegionStateTransitionRequest request = 2685 createReportRegionStateTransitionRequest(context); 2686 2687 int tries = 0; 2688 long pauseTime = this.retryPauseTime; 2689 // Keep looping till we get an error. We want to send reports even though server is going down. 2690 // Only go down if clusterConnection is null. It is set to null almost as last thing as the 2691 // HRegionServer does down. 2692 while (this.clusterConnection != null && !this.clusterConnection.isClosed()) { 2693 RegionServerStatusService.BlockingInterface rss = rssStub; 2694 try { 2695 if (rss == null) { 2696 createRegionServerStatusStub(); 2697 continue; 2698 } 2699 ReportRegionStateTransitionResponse response = 2700 rss.reportRegionStateTransition(null, request); 2701 if (response.hasErrorMessage()) { 2702 LOG.info("TRANSITION FAILED " + request + ": " + response.getErrorMessage()); 2703 break; 2704 } 2705 // Log if we had to retry else don't log unless TRACE. We want to 2706 // know if were successful after an attempt showed in logs as failed. 2707 if (tries > 0 || LOG.isTraceEnabled()) { 2708 LOG.info("TRANSITION REPORTED " + request); 2709 } 2710 // NOTE: Return mid-method!!! 2711 return true; 2712 } catch (ServiceException se) { 2713 IOException ioe = ProtobufUtil.getRemoteException(se); 2714 boolean pause = ioe instanceof ServerNotRunningYetException 2715 || ioe instanceof PleaseHoldException || ioe instanceof CallQueueTooBigException; 2716 if (pause) { 2717 // Do backoff else we flood the Master with requests. 2718 pauseTime = ConnectionUtils.getPauseTime(this.retryPauseTime, tries); 2719 } else { 2720 pauseTime = this.retryPauseTime; // Reset. 2721 } 2722 LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" 2723 + tries + ")" 2724 + (pause 2725 ? " after " + pauseTime + "ms delay (Master is coming online...)." 2726 : " immediately."), 2727 ioe); 2728 if (pause) { 2729 Threads.sleep(pauseTime); 2730 } 2731 tries++; 2732 if (rssStub == rss) { 2733 rssStub = null; 2734 } 2735 } 2736 } 2737 return false; 2738 } 2739 2740 /** 2741 * Trigger a flush in the primary region replica if this region is a secondary replica. Does not 2742 * block this thread. See RegionReplicaFlushHandler for details. 2743 */ 2744 private void triggerFlushInPrimaryRegion(final HRegion region) { 2745 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 2746 return; 2747 } 2748 TableName tn = region.getTableDescriptor().getTableName(); 2749 if ( 2750 !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) 2751 || !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf) || 2752 // If the memstore replication not setup, we do not have to wait for observing a flush event 2753 // from primary before starting to serve reads, because gaps from replication is not 2754 // applicable,this logic is from 2755 // TableDescriptorBuilder.ModifyableTableDescriptor.setRegionMemStoreReplication by 2756 // HBASE-13063 2757 !region.getTableDescriptor().hasRegionMemStoreReplication() 2758 ) { 2759 region.setReadsEnabled(true); 2760 return; 2761 } 2762 2763 region.setReadsEnabled(false); // disable reads before marking the region as opened. 2764 // RegionReplicaFlushHandler might reset this. 2765 2766 // Submit it to be handled by one of the handlers so that we do not block OpenRegionHandler 2767 if (this.executorService != null) { 2768 this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection, 2769 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region)); 2770 } else { 2771 LOG.info("Executor is null; not running flush of primary region replica for {}", 2772 region.getRegionInfo()); 2773 } 2774 } 2775 2776 @Override 2777 public RpcServerInterface getRpcServer() { 2778 return rpcServices.rpcServer; 2779 } 2780 2781 @InterfaceAudience.Private 2782 public RSRpcServices getRSRpcServices() { 2783 return rpcServices; 2784 } 2785 2786 /** 2787 * Cause the server to exit without closing the regions it is serving, the log it is using and 2788 * without notifying the master. Used unit testing and on catastrophic events such as HDFS is 2789 * yanked out from under hbase or we OOME. the reason we are aborting the exception that caused 2790 * the abort, or null 2791 */ 2792 @Override 2793 public void abort(String reason, Throwable cause) { 2794 if (!setAbortRequested()) { 2795 // Abort already in progress, ignore the new request. 2796 LOG.debug("Abort already in progress. Ignoring the current request with reason: {}", reason); 2797 return; 2798 } 2799 String msg = "***** ABORTING region server " + this + ": " + reason + " *****"; 2800 if (cause != null) { 2801 LOG.error(HBaseMarkers.FATAL, msg, cause); 2802 } else { 2803 LOG.error(HBaseMarkers.FATAL, msg); 2804 } 2805 // HBASE-4014: show list of coprocessors that were loaded to help debug 2806 // regionserver crashes.Note that we're implicitly using 2807 // java.util.HashSet's toString() method to print the coprocessor names. 2808 LOG.error(HBaseMarkers.FATAL, 2809 "RegionServer abort: loaded coprocessors are: " + CoprocessorHost.getLoadedCoprocessors()); 2810 // Try and dump metrics if abort -- might give clue as to how fatal came about.... 2811 try { 2812 LOG.info("Dump of metrics as JSON on abort: " + DumpRegionServerMetrics.dumpMetrics()); 2813 } catch (MalformedObjectNameException | IOException e) { 2814 LOG.warn("Failed dumping metrics", e); 2815 } 2816 2817 // Do our best to report our abort to the master, but this may not work 2818 try { 2819 if (cause != null) { 2820 msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause); 2821 } 2822 // Report to the master but only if we have already registered with the master. 2823 RegionServerStatusService.BlockingInterface rss = rssStub; 2824 if (rss != null && this.serverName != null) { 2825 ReportRSFatalErrorRequest.Builder builder = ReportRSFatalErrorRequest.newBuilder(); 2826 builder.setServer(ProtobufUtil.toServerName(this.serverName)); 2827 builder.setErrorMessage(msg); 2828 rss.reportRSFatalError(null, builder.build()); 2829 } 2830 } catch (Throwable t) { 2831 LOG.warn("Unable to report fatal error to master", t); 2832 } 2833 2834 scheduleAbortTimer(); 2835 // shutdown should be run as the internal user 2836 stop(reason, true, null); 2837 } 2838 2839 /** 2840 * Sets the abort state if not already set. 2841 * @return True if abortRequested set to True successfully, false if an abort is already in 2842 * progress. 2843 */ 2844 protected boolean setAbortRequested() { 2845 return abortRequested.compareAndSet(false, true); 2846 } 2847 2848 @Override 2849 public boolean isAborted() { 2850 return abortRequested.get(); 2851 } 2852 2853 /* 2854 * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup logs but it does 2855 * close socket in case want to bring up server on old hostname+port immediately. 2856 */ 2857 @InterfaceAudience.Private 2858 protected void kill() { 2859 this.killed = true; 2860 abort("Simulated kill"); 2861 } 2862 2863 // Limits the time spent in the shutdown process. 2864 private void scheduleAbortTimer() { 2865 if (this.abortMonitor == null) { 2866 this.abortMonitor = new Timer("Abort regionserver monitor", true); 2867 TimerTask abortTimeoutTask = null; 2868 try { 2869 Constructor<? extends TimerTask> timerTaskCtor = 2870 Class.forName(conf.get(ABORT_TIMEOUT_TASK, SystemExitWhenAbortTimeout.class.getName())) 2871 .asSubclass(TimerTask.class).getDeclaredConstructor(); 2872 timerTaskCtor.setAccessible(true); 2873 abortTimeoutTask = timerTaskCtor.newInstance(); 2874 } catch (Exception e) { 2875 LOG.warn("Initialize abort timeout task failed", e); 2876 } 2877 if (abortTimeoutTask != null) { 2878 abortMonitor.schedule(abortTimeoutTask, conf.getLong(ABORT_TIMEOUT, DEFAULT_ABORT_TIMEOUT)); 2879 } 2880 } 2881 } 2882 2883 protected final void shutdownChore(ScheduledChore chore) { 2884 if (chore != null) { 2885 chore.shutdown(); 2886 } 2887 } 2888 2889 /** 2890 * Wait on all threads to finish. Presumption is that all closes and stops have already been 2891 * called. 2892 */ 2893 protected void stopServiceThreads() { 2894 // clean up the scheduled chores 2895 if (this.choreService != null) { 2896 shutdownChore(nonceManagerChore); 2897 shutdownChore(compactionChecker); 2898 shutdownChore(compactedFileDischarger); 2899 shutdownChore(periodicFlusher); 2900 shutdownChore(healthCheckChore); 2901 shutdownChore(storefileRefresher); 2902 shutdownChore(fsUtilizationChore); 2903 shutdownChore(namedQueueServiceChore); 2904 shutdownChore(replicationMarkerChore); 2905 shutdownChore(rsMobFileCleanerChore); 2906 // cancel the remaining scheduled chores (in case we missed out any) 2907 // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any 2908 choreService.shutdown(); 2909 } 2910 if (bootstrapNodeManager != null) { 2911 bootstrapNodeManager.stop(); 2912 } 2913 if (this.cacheFlusher != null) { 2914 this.cacheFlusher.shutdown(); 2915 } 2916 if (this.walRoller != null) { 2917 this.walRoller.close(); 2918 } 2919 if (this.compactSplitThread != null) { 2920 this.compactSplitThread.join(); 2921 } 2922 if (this.executorService != null) { 2923 this.executorService.shutdown(); 2924 } 2925 if ( 2926 this.replicationSourceHandler != null 2927 && this.replicationSourceHandler == this.replicationSinkHandler 2928 ) { 2929 this.replicationSourceHandler.stopReplicationService(); 2930 } else { 2931 if (this.replicationSourceHandler != null) { 2932 this.replicationSourceHandler.stopReplicationService(); 2933 } 2934 if (this.replicationSinkHandler != null) { 2935 this.replicationSinkHandler.stopReplicationService(); 2936 } 2937 } 2938 } 2939 2940 /** Returns Return the object that implements the replication source executorService. */ 2941 @InterfaceAudience.Private 2942 public ReplicationSourceService getReplicationSourceService() { 2943 return replicationSourceHandler; 2944 } 2945 2946 /** Returns Return the object that implements the replication sink executorService. */ 2947 ReplicationSinkService getReplicationSinkService() { 2948 return replicationSinkHandler; 2949 } 2950 2951 /** 2952 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2953 * connection, the current rssStub must be null. Method will block until a master is available. 2954 * You can break from this block by requesting the server stop. 2955 * @return master + port, or null if server has been stopped 2956 */ 2957 private synchronized ServerName createRegionServerStatusStub() { 2958 // Create RS stub without refreshing the master node from ZK, use cached data 2959 return createRegionServerStatusStub(false); 2960 } 2961 2962 /** 2963 * Get the current master from ZooKeeper and open the RPC connection to it. To get a fresh 2964 * connection, the current rssStub must be null. Method will block until a master is available. 2965 * You can break from this block by requesting the server stop. 2966 * @param refresh If true then master address will be read from ZK, otherwise use cached data 2967 * @return master + port, or null if server has been stopped 2968 */ 2969 @InterfaceAudience.Private 2970 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 2971 if (rssStub != null) { 2972 return masterAddressTracker.getMasterAddress(); 2973 } 2974 ServerName sn = null; 2975 long previousLogTime = 0; 2976 RegionServerStatusService.BlockingInterface intRssStub = null; 2977 LockService.BlockingInterface intLockStub = null; 2978 boolean interrupted = false; 2979 try { 2980 while (keepLooping()) { 2981 sn = this.masterAddressTracker.getMasterAddress(refresh); 2982 if (sn == null) { 2983 if (!keepLooping()) { 2984 // give up with no connection. 2985 LOG.debug("No master found and cluster is stopped; bailing out"); 2986 return null; 2987 } 2988 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 2989 LOG.debug("No master found; retry"); 2990 previousLogTime = EnvironmentEdgeManager.currentTime(); 2991 } 2992 refresh = true; // let's try pull it from ZK directly 2993 if (sleepInterrupted(200)) { 2994 interrupted = true; 2995 } 2996 continue; 2997 } 2998 2999 // If we are on the active master, use the shortcut 3000 if (this instanceof HMaster && sn.equals(getServerName())) { 3001 intRssStub = ((HMaster) this).getMasterRpcServices(); 3002 intLockStub = ((HMaster) this).getMasterRpcServices(); 3003 break; 3004 } 3005 try { 3006 BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, 3007 userProvider.getCurrent(), shortOperationTimeout); 3008 intRssStub = RegionServerStatusService.newBlockingStub(channel); 3009 intLockStub = LockService.newBlockingStub(channel); 3010 break; 3011 } catch (IOException e) { 3012 if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) { 3013 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 3014 if (e instanceof ServerNotRunningYetException) { 3015 LOG.info("Master isn't available yet, retrying"); 3016 } else { 3017 LOG.warn("Unable to connect to master. Retrying. Error was:", e); 3018 } 3019 previousLogTime = EnvironmentEdgeManager.currentTime(); 3020 } 3021 if (sleepInterrupted(200)) { 3022 interrupted = true; 3023 } 3024 } 3025 } 3026 } finally { 3027 if (interrupted) { 3028 Thread.currentThread().interrupt(); 3029 } 3030 } 3031 this.rssStub = intRssStub; 3032 this.lockStub = intLockStub; 3033 return sn; 3034 } 3035 3036 /** 3037 * @return True if we should break loop because cluster is going down or this server has been 3038 * stopped or hdfs has gone bad. 3039 */ 3040 private boolean keepLooping() { 3041 return !this.stopped && isClusterUp(); 3042 } 3043 3044 /* 3045 * Let the master know we're here Run initialization using parameters passed us by the master. 3046 * @return A Map of key/value configurations we got from the Master else null if we failed to 3047 * register. 3048 */ 3049 private RegionServerStartupResponse reportForDuty() throws IOException { 3050 if (this.masterless) { 3051 return RegionServerStartupResponse.getDefaultInstance(); 3052 } 3053 ServerName masterServerName = createRegionServerStatusStub(true); 3054 RegionServerStatusService.BlockingInterface rss = rssStub; 3055 if (masterServerName == null || rss == null) { 3056 return null; 3057 } 3058 RegionServerStartupResponse result = null; 3059 try { 3060 rpcServices.requestCount.reset(); 3061 rpcServices.rpcGetRequestCount.reset(); 3062 rpcServices.rpcScanRequestCount.reset(); 3063 rpcServices.rpcFullScanRequestCount.reset(); 3064 rpcServices.rpcMultiRequestCount.reset(); 3065 rpcServices.rpcMutateRequestCount.reset(); 3066 LOG.info("reportForDuty to master=" + masterServerName + " with isa=" + rpcServices.isa 3067 + ", startcode=" + this.startcode); 3068 long now = EnvironmentEdgeManager.currentTime(); 3069 int port = rpcServices.isa.getPort(); 3070 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); 3071 if (!StringUtils.isBlank(useThisHostnameInstead)) { 3072 request.setUseThisHostnameInstead(useThisHostnameInstead); 3073 } 3074 request.setPort(port); 3075 request.setServerStartCode(this.startcode); 3076 request.setServerCurrentTime(now); 3077 result = rss.regionServerStartup(null, request.build()); 3078 } catch (ServiceException se) { 3079 IOException ioe = ProtobufUtil.getRemoteException(se); 3080 if (ioe instanceof ClockOutOfSyncException) { 3081 LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe); 3082 // Re-throw IOE will cause RS to abort 3083 throw ioe; 3084 } else if (ioe instanceof DecommissionedHostRejectedException) { 3085 LOG.error(HBaseMarkers.FATAL, 3086 "Master rejected startup because the host is considered decommissioned", ioe); 3087 // Re-throw IOE will cause RS to abort 3088 throw ioe; 3089 } else if (ioe instanceof ServerNotRunningYetException) { 3090 LOG.debug("Master is not running yet"); 3091 } else { 3092 LOG.warn("error telling master we are up", se); 3093 } 3094 rssStub = null; 3095 } 3096 return result; 3097 } 3098 3099 @Override 3100 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { 3101 try { 3102 GetLastFlushedSequenceIdRequest req = 3103 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName); 3104 RegionServerStatusService.BlockingInterface rss = rssStub; 3105 if (rss == null) { // Try to connect one more time 3106 createRegionServerStatusStub(); 3107 rss = rssStub; 3108 if (rss == null) { 3109 // Still no luck, we tried 3110 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id"); 3111 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 3112 .build(); 3113 } 3114 } 3115 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req); 3116 return RegionStoreSequenceIds.newBuilder() 3117 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId()) 3118 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build(); 3119 } catch (ServiceException e) { 3120 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e); 3121 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM) 3122 .build(); 3123 } 3124 } 3125 3126 /** 3127 * Close meta region if we carry it 3128 * @param abort Whether we're running an abort. 3129 */ 3130 private void closeMetaTableRegions(final boolean abort) { 3131 HRegion meta = null; 3132 this.onlineRegionsLock.writeLock().lock(); 3133 try { 3134 for (Map.Entry<String, HRegion> e : onlineRegions.entrySet()) { 3135 RegionInfo hri = e.getValue().getRegionInfo(); 3136 if (hri.isMetaRegion()) { 3137 meta = e.getValue(); 3138 } 3139 if (meta != null) { 3140 break; 3141 } 3142 } 3143 } finally { 3144 this.onlineRegionsLock.writeLock().unlock(); 3145 } 3146 if (meta != null) { 3147 closeRegionIgnoreErrors(meta.getRegionInfo(), abort); 3148 } 3149 } 3150 3151 /** 3152 * Schedule closes on all user regions. Should be safe calling multiple times because it wont' 3153 * close regions that are already closed or that are closing. 3154 * @param abort Whether we're running an abort. 3155 */ 3156 private void closeUserRegions(final boolean abort) { 3157 this.onlineRegionsLock.writeLock().lock(); 3158 try { 3159 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) { 3160 HRegion r = e.getValue(); 3161 if (!r.getRegionInfo().isMetaRegion() && r.isAvailable()) { 3162 // Don't update zk with this close transition; pass false. 3163 closeRegionIgnoreErrors(r.getRegionInfo(), abort); 3164 } 3165 } 3166 } finally { 3167 this.onlineRegionsLock.writeLock().unlock(); 3168 } 3169 } 3170 3171 /** Returns the info server */ 3172 public InfoServer getInfoServer() { 3173 return infoServer; 3174 } 3175 3176 /** Returns true if a stop has been requested. */ 3177 @Override 3178 public boolean isStopped() { 3179 return this.stopped; 3180 } 3181 3182 @Override 3183 public boolean isStopping() { 3184 return this.stopping; 3185 } 3186 3187 @Override 3188 public Configuration getConfiguration() { 3189 return conf; 3190 } 3191 3192 protected Map<String, HRegion> getOnlineRegions() { 3193 return this.onlineRegions; 3194 } 3195 3196 public int getNumberOfOnlineRegions() { 3197 return this.onlineRegions.size(); 3198 } 3199 3200 /** 3201 * For tests, web ui and metrics. This method will only work if HRegionServer is in the same JVM 3202 * as client; HRegion cannot be serialized to cross an rpc. 3203 */ 3204 public Collection<HRegion> getOnlineRegionsLocalContext() { 3205 Collection<HRegion> regions = this.onlineRegions.values(); 3206 return Collections.unmodifiableCollection(regions); 3207 } 3208 3209 @Override 3210 public void addRegion(HRegion region) { 3211 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region); 3212 configurationManager.registerObserver(region); 3213 } 3214 3215 /** 3216 * @return A new Map of online regions sorted by region off-heap size with the first entry being 3217 * the biggest. If two regions are the same size, then the last one found wins; i.e. this 3218 * method may NOT return all regions. 3219 */ 3220 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOffHeapSize() { 3221 // we'll sort the regions in reverse 3222 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 3223 // Copy over all regions. Regions are sorted by size with biggest first. 3224 for (HRegion region : this.onlineRegions.values()) { 3225 sortedRegions.put(region.getMemStoreOffHeapSize(), region); 3226 } 3227 return sortedRegions; 3228 } 3229 3230 /** 3231 * @return A new Map of online regions sorted by region heap size with the first entry being the 3232 * biggest. If two regions are the same size, then the last one found wins; i.e. this 3233 * method may NOT return all regions. 3234 */ 3235 SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedByOnHeapSize() { 3236 // we'll sort the regions in reverse 3237 SortedMap<Long, HRegion> sortedRegions = new TreeMap<>(Comparator.reverseOrder()); 3238 // Copy over all regions. Regions are sorted by size with biggest first. 3239 for (HRegion region : this.onlineRegions.values()) { 3240 sortedRegions.put(region.getMemStoreHeapSize(), region); 3241 } 3242 return sortedRegions; 3243 } 3244 3245 /** Returns time stamp in millis of when this region server was started */ 3246 public long getStartcode() { 3247 return this.startcode; 3248 } 3249 3250 /** Returns reference to FlushRequester */ 3251 @Override 3252 public FlushRequester getFlushRequester() { 3253 return this.cacheFlusher; 3254 } 3255 3256 @Override 3257 public CompactionRequester getCompactionRequestor() { 3258 return this.compactSplitThread; 3259 } 3260 3261 @Override 3262 public LeaseManager getLeaseManager() { 3263 return leaseManager; 3264 } 3265 3266 /** Returns Return the rootDir. */ 3267 protected Path getDataRootDir() { 3268 return dataRootDir; 3269 } 3270 3271 @Override 3272 public FileSystem getFileSystem() { 3273 return dataFs; 3274 } 3275 3276 /** Returns {@code true} when the data file system is available, {@code false} otherwise. */ 3277 boolean isDataFileSystemOk() { 3278 return this.dataFsOk; 3279 } 3280 3281 /** Returns Return the walRootDir. */ 3282 public Path getWALRootDir() { 3283 return walRootDir; 3284 } 3285 3286 /** Returns Return the walFs. */ 3287 public FileSystem getWALFileSystem() { 3288 return walFs; 3289 } 3290 3291 @Override 3292 public String toString() { 3293 return getServerName().toString(); 3294 } 3295 3296 @Override 3297 public ZKWatcher getZooKeeper() { 3298 return zooKeeper; 3299 } 3300 3301 @Override 3302 public CoordinatedStateManager getCoordinatedStateManager() { 3303 return csm; 3304 } 3305 3306 @Override 3307 public ServerName getServerName() { 3308 return serverName; 3309 } 3310 3311 public RegionServerCoprocessorHost getRegionServerCoprocessorHost() { 3312 return this.rsHost; 3313 } 3314 3315 @Override 3316 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { 3317 return this.regionsInTransitionInRS; 3318 } 3319 3320 @Override 3321 public ExecutorService getExecutorService() { 3322 return executorService; 3323 } 3324 3325 @Override 3326 public ChoreService getChoreService() { 3327 return choreService; 3328 } 3329 3330 @Override 3331 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() { 3332 return rsQuotaManager; 3333 } 3334 3335 // 3336 // Main program and support routines 3337 // 3338 /** 3339 * Load the replication executorService objects, if any 3340 */ 3341 private static void createNewReplicationInstance(Configuration conf, HRegionServer server, 3342 FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) throws IOException { 3343 if ( 3344 (server instanceof HMaster) 3345 && (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf)) 3346 ) { 3347 return; 3348 } 3349 // read in the name of the source replication class from the config file. 3350 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME, 3351 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3352 3353 // read in the name of the sink replication class from the config file. 3354 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME, 3355 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT); 3356 3357 // If both the sink and the source class names are the same, then instantiate 3358 // only one object. 3359 if (sourceClassname.equals(sinkClassname)) { 3360 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3361 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3362 server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; 3363 } else { 3364 server.replicationSourceHandler = newReplicationInstance(sourceClassname, 3365 ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3366 server.replicationSinkHandler = newReplicationInstance(sinkClassname, 3367 ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walFactory); 3368 } 3369 } 3370 3371 private static <T extends ReplicationService> T newReplicationInstance(String classname, 3372 Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir, 3373 Path oldLogDir, WALFactory walFactory) throws IOException { 3374 final Class<? extends T> clazz; 3375 try { 3376 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 3377 clazz = Class.forName(classname, true, classLoader).asSubclass(xface); 3378 } catch (java.lang.ClassNotFoundException nfe) { 3379 throw new IOException("Could not find class for " + classname); 3380 } 3381 T service = ReflectionUtils.newInstance(clazz, conf); 3382 service.initialize(server, walFs, logDir, oldLogDir, walFactory); 3383 return service; 3384 } 3385 3386 public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() { 3387 Map<String, ReplicationStatus> walGroupsReplicationStatus = new TreeMap<>(); 3388 if (!this.isOnline()) { 3389 return walGroupsReplicationStatus; 3390 } 3391 List<ReplicationSourceInterface> allSources = new ArrayList<>(); 3392 allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); 3393 allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); 3394 for (ReplicationSourceInterface source : allSources) { 3395 walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); 3396 } 3397 return walGroupsReplicationStatus; 3398 } 3399 3400 /** 3401 * Utility for constructing an instance of the passed HRegionServer class. 3402 */ 3403 static HRegionServer constructRegionServer(final Class<? extends HRegionServer> regionServerClass, 3404 final Configuration conf) { 3405 try { 3406 Constructor<? extends HRegionServer> c = 3407 regionServerClass.getConstructor(Configuration.class); 3408 return c.newInstance(conf); 3409 } catch (Exception e) { 3410 throw new RuntimeException( 3411 "Failed construction of " + "Regionserver: " + regionServerClass.toString(), e); 3412 } 3413 } 3414 3415 /** 3416 * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine 3417 */ 3418 public static void main(String[] args) { 3419 LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName()); 3420 VersionInfo.logVersion(); 3421 Configuration conf = HBaseConfiguration.create(); 3422 @SuppressWarnings("unchecked") 3423 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf 3424 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); 3425 3426 new HRegionServerCommandLine(regionServerClass).doMain(args); 3427 } 3428 3429 /** 3430 * Gets the online regions of the specified table. This method looks at the in-memory 3431 * onlineRegions. It does not go to <code>hbase:meta</code>. Only returns <em>online</em> regions. 3432 * If a region on this table has been closed during a disable, etc., it will not be included in 3433 * the returned list. So, the returned list may not necessarily be ALL regions in this table, its 3434 * all the ONLINE regions in the table. 3435 * @param tableName table to limit the scope of the query 3436 * @return Online regions from <code>tableName</code> 3437 */ 3438 @Override 3439 public List<HRegion> getRegions(TableName tableName) { 3440 List<HRegion> tableRegions = new ArrayList<>(); 3441 synchronized (this.onlineRegions) { 3442 for (HRegion region : this.onlineRegions.values()) { 3443 RegionInfo regionInfo = region.getRegionInfo(); 3444 if (regionInfo.getTable().equals(tableName)) { 3445 tableRegions.add(region); 3446 } 3447 } 3448 } 3449 return tableRegions; 3450 } 3451 3452 @Override 3453 public List<HRegion> getRegions() { 3454 List<HRegion> allRegions; 3455 synchronized (this.onlineRegions) { 3456 // Return a clone copy of the onlineRegions 3457 allRegions = new ArrayList<>(onlineRegions.values()); 3458 } 3459 return allRegions; 3460 } 3461 3462 /** 3463 * Gets the online tables in this RS. This method looks at the in-memory onlineRegions. 3464 * @return all the online tables in this RS 3465 */ 3466 public Set<TableName> getOnlineTables() { 3467 Set<TableName> tables = new HashSet<>(); 3468 synchronized (this.onlineRegions) { 3469 for (Region region : this.onlineRegions.values()) { 3470 tables.add(region.getTableDescriptor().getTableName()); 3471 } 3472 } 3473 return tables; 3474 } 3475 3476 public String[] getRegionServerCoprocessors() { 3477 TreeSet<String> coprocessors = new TreeSet<>(); 3478 try { 3479 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors()); 3480 } catch (IOException exception) { 3481 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " 3482 + "skipping."); 3483 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3484 } 3485 Collection<HRegion> regions = getOnlineRegionsLocalContext(); 3486 for (HRegion region : regions) { 3487 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors()); 3488 try { 3489 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors()); 3490 } catch (IOException exception) { 3491 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region 3492 + "; skipping."); 3493 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception); 3494 } 3495 } 3496 coprocessors.addAll(rsHost.getCoprocessors()); 3497 return coprocessors.toArray(new String[0]); 3498 } 3499 3500 /** 3501 * Try to close the region, logs a warning on failure but continues. 3502 * @param region Region to close 3503 */ 3504 private void closeRegionIgnoreErrors(RegionInfo region, final boolean abort) { 3505 try { 3506 if (!closeRegion(region.getEncodedName(), abort, null)) { 3507 LOG 3508 .warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing"); 3509 } 3510 } catch (IOException e) { 3511 LOG.warn("Failed to close " + region.getRegionNameAsString() + " - ignoring and continuing", 3512 e); 3513 } 3514 } 3515 3516 /** 3517 * Close asynchronously a region, can be called from the master or internally by the regionserver 3518 * when stopping. If called from the master, the region will update the status. 3519 * <p> 3520 * If an opening was in progress, this method will cancel it, but will not start a new close. The 3521 * coprocessors are not called in this case. A NotServingRegionException exception is thrown. 3522 * </p> 3523 * <p> 3524 * If a close was in progress, this new request will be ignored, and an exception thrown. 3525 * </p> 3526 * <p> 3527 * Provides additional flag to indicate if this region blocks should be evicted from the cache. 3528 * </p> 3529 * @param encodedName Region to close 3530 * @param abort True if we are aborting 3531 * @param destination Where the Region is being moved too... maybe null if unknown. 3532 * @return True if closed a region. 3533 * @throws NotServingRegionException if the region is not online 3534 */ 3535 protected boolean closeRegion(String encodedName, final boolean abort, 3536 final ServerName destination) throws NotServingRegionException { 3537 // Check for permissions to close. 3538 HRegion actualRegion = this.getRegion(encodedName); 3539 // Can be null if we're calling close on a region that's not online 3540 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) { 3541 try { 3542 actualRegion.getCoprocessorHost().preClose(false); 3543 } catch (IOException exp) { 3544 LOG.warn("Unable to close region: the coprocessor launched an error ", exp); 3545 return false; 3546 } 3547 } 3548 3549 // previous can come back 'null' if not in map. 3550 final Boolean previous = 3551 this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName), Boolean.FALSE); 3552 3553 if (Boolean.TRUE.equals(previous)) { 3554 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " 3555 + "trying to OPEN. Cancelling OPENING."); 3556 if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) { 3557 // The replace failed. That should be an exceptional case, but theoretically it can happen. 3558 // We're going to try to do a standard close then. 3559 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." 3560 + " Doing a standard close now"); 3561 return closeRegion(encodedName, abort, destination); 3562 } 3563 // Let's get the region from the online region list again 3564 actualRegion = this.getRegion(encodedName); 3565 if (actualRegion == null) { // If already online, we still need to close it. 3566 LOG.info("The opening previously in progress has been cancelled by a CLOSE request."); 3567 // The master deletes the znode when it receives this exception. 3568 throw new NotServingRegionException( 3569 "The region " + encodedName + " was opening but not yet served. Opening is cancelled."); 3570 } 3571 } else if (previous == null) { 3572 LOG.info("Received CLOSE for {}", encodedName); 3573 } else if (Boolean.FALSE.equals(previous)) { 3574 LOG.info("Received CLOSE for the region: " + encodedName 3575 + ", which we are already trying to CLOSE, but not completed yet"); 3576 return true; 3577 } 3578 3579 if (actualRegion == null) { 3580 LOG.debug("Received CLOSE for a region which is not online, and we're not opening."); 3581 this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName)); 3582 // The master deletes the znode when it receives this exception. 3583 throw new NotServingRegionException( 3584 "The region " + encodedName + " is not online, and is not opening."); 3585 } 3586 3587 CloseRegionHandler crh; 3588 final RegionInfo hri = actualRegion.getRegionInfo(); 3589 if (hri.isMetaRegion()) { 3590 crh = new CloseMetaHandler(this, this, hri, abort); 3591 } else { 3592 crh = new CloseRegionHandler(this, this, hri, abort, destination); 3593 } 3594 this.executorService.submit(crh); 3595 return true; 3596 } 3597 3598 /** 3599 * @return HRegion for the passed binary <code>regionName</code> or null if named region is not 3600 * member of the online regions. 3601 */ 3602 public HRegion getOnlineRegion(final byte[] regionName) { 3603 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3604 return this.onlineRegions.get(encodedRegionName); 3605 } 3606 3607 @Override 3608 public HRegion getRegion(final String encodedRegionName) { 3609 return this.onlineRegions.get(encodedRegionName); 3610 } 3611 3612 @Override 3613 public boolean removeRegion(final HRegion r, ServerName destination) { 3614 HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); 3615 metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName()); 3616 if (destination != null) { 3617 long closeSeqNum = r.getMaxFlushedSeqId(); 3618 if (closeSeqNum == HConstants.NO_SEQNUM) { 3619 // No edits in WAL for this region; get the sequence number when the region was opened. 3620 closeSeqNum = r.getOpenSeqNum(); 3621 if (closeSeqNum == HConstants.NO_SEQNUM) { 3622 closeSeqNum = 0; 3623 } 3624 } 3625 boolean selfMove = ServerName.isSameAddress(destination, this.getServerName()); 3626 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove); 3627 if (selfMove) { 3628 this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put( 3629 r.getRegionInfo().getEncodedName(), 3630 new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount())); 3631 } 3632 } 3633 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); 3634 return toReturn != null; 3635 } 3636 3637 /** 3638 * Protected Utility method for safely obtaining an HRegion handle. 3639 * @param regionName Name of online {@link HRegion} to return 3640 * @return {@link HRegion} for <code>regionName</code> 3641 */ 3642 protected HRegion getRegion(final byte[] regionName) throws NotServingRegionException { 3643 String encodedRegionName = RegionInfo.encodeRegionName(regionName); 3644 return getRegionByEncodedName(regionName, encodedRegionName); 3645 } 3646 3647 public HRegion getRegionByEncodedName(String encodedRegionName) throws NotServingRegionException { 3648 return getRegionByEncodedName(null, encodedRegionName); 3649 } 3650 3651 private HRegion getRegionByEncodedName(byte[] regionName, String encodedRegionName) 3652 throws NotServingRegionException { 3653 HRegion region = this.onlineRegions.get(encodedRegionName); 3654 if (region == null) { 3655 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName); 3656 if (moveInfo != null) { 3657 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum()); 3658 } 3659 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName)); 3660 String regionNameStr = 3661 regionName == null ? encodedRegionName : Bytes.toStringBinary(regionName); 3662 if (isOpening != null && isOpening) { 3663 throw new RegionOpeningException( 3664 "Region " + regionNameStr + " is opening on " + this.serverName); 3665 } 3666 throw new NotServingRegionException( 3667 "" + regionNameStr + " is not online on " + this.serverName); 3668 } 3669 return region; 3670 } 3671 3672 /** 3673 * Cleanup after Throwable caught invoking method. Converts <code>t</code> to IOE if it isn't 3674 * already. 3675 * @param t Throwable 3676 * @param msg Message to log in error. Can be null. 3677 * @return Throwable converted to an IOE; methods can only let out IOEs. 3678 */ 3679 private Throwable cleanup(final Throwable t, final String msg) { 3680 // Don't log as error if NSRE; NSRE is 'normal' operation. 3681 if (t instanceof NotServingRegionException) { 3682 LOG.debug("NotServingRegionException; " + t.getMessage()); 3683 return t; 3684 } 3685 Throwable e = t instanceof RemoteException ? ((RemoteException) t).unwrapRemoteException() : t; 3686 if (msg == null) { 3687 LOG.error("", e); 3688 } else { 3689 LOG.error(msg, e); 3690 } 3691 if (!rpcServices.checkOOME(t)) { 3692 checkFileSystem(); 3693 } 3694 return t; 3695 } 3696 3697 /** 3698 * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE 3699 * @return Make <code>t</code> an IOE if it isn't already. 3700 */ 3701 private IOException convertThrowableToIOE(final Throwable t, final String msg) { 3702 return (t instanceof IOException ? (IOException) t 3703 : msg == null || msg.length() == 0 ? new IOException(t) 3704 : new IOException(msg, t)); 3705 } 3706 3707 /** 3708 * Checks to see if the file system is still accessible. If not, sets abortRequested and 3709 * stopRequested 3710 * @return false if file system is not available 3711 */ 3712 boolean checkFileSystem() { 3713 if (this.dataFsOk && this.dataFs != null) { 3714 try { 3715 FSUtils.checkFileSystemAvailable(this.dataFs); 3716 } catch (IOException e) { 3717 abort("File System not available", e); 3718 this.dataFsOk = false; 3719 } 3720 } 3721 return this.dataFsOk; 3722 } 3723 3724 @Override 3725 public void updateRegionFavoredNodesMapping(String encodedRegionName, 3726 List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName> favoredNodes) { 3727 Address[] addr = new Address[favoredNodes.size()]; 3728 // Refer to the comment on the declaration of regionFavoredNodesMap on why 3729 // it is a map of region name to Address[] 3730 for (int i = 0; i < favoredNodes.size(); i++) { 3731 addr[i] = Address.fromParts(favoredNodes.get(i).getHostName(), favoredNodes.get(i).getPort()); 3732 } 3733 regionFavoredNodesMap.put(encodedRegionName, addr); 3734 } 3735 3736 /** 3737 * Return the favored nodes for a region given its encoded name. Look at the comment around 3738 * {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[] here. 3739 * @param encodedRegionName the encoded region name. 3740 * @return array of favored locations 3741 */ 3742 @Override 3743 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { 3744 return Address.toSocketAddress(regionFavoredNodesMap.get(encodedRegionName)); 3745 } 3746 3747 @Override 3748 public ServerNonceManager getNonceManager() { 3749 return this.nonceManager; 3750 } 3751 3752 private static class MovedRegionInfo { 3753 private final ServerName serverName; 3754 private final long seqNum; 3755 3756 MovedRegionInfo(ServerName serverName, long closeSeqNum) { 3757 this.serverName = serverName; 3758 this.seqNum = closeSeqNum; 3759 } 3760 3761 public ServerName getServerName() { 3762 return serverName; 3763 } 3764 3765 public long getSeqNum() { 3766 return seqNum; 3767 } 3768 } 3769 3770 /** 3771 * We need a timeout. If not there is a risk of giving a wrong information: this would double the 3772 * number of network calls instead of reducing them. 3773 */ 3774 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000); 3775 3776 private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, 3777 boolean selfMove) { 3778 if (selfMove) { 3779 LOG.warn("Not adding moved region record: " + encodedName + " to self."); 3780 return; 3781 } 3782 LOG.info("Adding " + encodedName + " move to " + destination + " record at close sequenceid=" 3783 + closeSeqNum); 3784 movedRegionInfoCache.put(encodedName, new MovedRegionInfo(destination, closeSeqNum)); 3785 } 3786 3787 void removeFromMovedRegions(String encodedName) { 3788 movedRegionInfoCache.invalidate(encodedName); 3789 } 3790 3791 @InterfaceAudience.Private 3792 public MovedRegionInfo getMovedRegion(String encodedRegionName) { 3793 return movedRegionInfoCache.getIfPresent(encodedRegionName); 3794 } 3795 3796 @InterfaceAudience.Private 3797 public int movedRegionCacheExpiredTime() { 3798 return TIMEOUT_REGION_MOVED; 3799 } 3800 3801 private String getMyEphemeralNodePath() { 3802 return zooKeeper.getZNodePaths().getRsPath(serverName); 3803 } 3804 3805 private boolean isHealthCheckerConfigured() { 3806 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); 3807 return org.apache.commons.lang3.StringUtils.isNotBlank(healthScriptLocation); 3808 } 3809 3810 /** Returns the underlying {@link CompactSplit} for the servers */ 3811 public CompactSplit getCompactSplitThread() { 3812 return this.compactSplitThread; 3813 } 3814 3815 CoprocessorServiceResponse execRegionServerService( 3816 @SuppressWarnings("UnusedParameters") final RpcController controller, 3817 final CoprocessorServiceRequest serviceRequest) throws ServiceException { 3818 try { 3819 ServerRpcController serviceController = new ServerRpcController(); 3820 CoprocessorServiceCall call = serviceRequest.getCall(); 3821 String serviceName = call.getServiceName(); 3822 com.google.protobuf.Service service = coprocessorServiceHandlers.get(serviceName); 3823 if (service == null) { 3824 throw new UnknownProtocolException(null, 3825 "No registered coprocessor executorService found for " + serviceName); 3826 } 3827 com.google.protobuf.Descriptors.ServiceDescriptor serviceDesc = 3828 service.getDescriptorForType(); 3829 3830 String methodName = call.getMethodName(); 3831 com.google.protobuf.Descriptors.MethodDescriptor methodDesc = 3832 serviceDesc.findMethodByName(methodName); 3833 if (methodDesc == null) { 3834 throw new UnknownProtocolException(service.getClass(), 3835 "Unknown method " + methodName + " called on executorService " + serviceName); 3836 } 3837 3838 com.google.protobuf.Message request = 3839 CoprocessorRpcUtils.getRequest(service, methodDesc, call.getRequest()); 3840 final com.google.protobuf.Message.Builder responseBuilder = 3841 service.getResponsePrototype(methodDesc).newBuilderForType(); 3842 service.callMethod(methodDesc, serviceController, request, message -> { 3843 if (message != null) { 3844 responseBuilder.mergeFrom(message); 3845 } 3846 }); 3847 IOException exception = CoprocessorRpcUtils.getControllerException(serviceController); 3848 if (exception != null) { 3849 throw exception; 3850 } 3851 return CoprocessorRpcUtils.getResponse(responseBuilder.build(), HConstants.EMPTY_BYTE_ARRAY); 3852 } catch (IOException ie) { 3853 throw new ServiceException(ie); 3854 } 3855 } 3856 3857 /** 3858 * May be null if this is a master which not carry table. 3859 * @return The block cache instance used by the regionserver. 3860 */ 3861 @Override 3862 public Optional<BlockCache> getBlockCache() { 3863 return Optional.ofNullable(this.blockCache); 3864 } 3865 3866 /** 3867 * May be null if this is a master which not carry table. 3868 * @return The cache for mob files used by the regionserver. 3869 */ 3870 @Override 3871 public Optional<MobFileCache> getMobFileCache() { 3872 return Optional.ofNullable(this.mobFileCache); 3873 } 3874 3875 @Override 3876 public AccessChecker getAccessChecker() { 3877 return rpcServices.getAccessChecker(); 3878 } 3879 3880 @Override 3881 public ZKPermissionWatcher getZKPermissionWatcher() { 3882 return rpcServices.getZkPermissionWatcher(); 3883 } 3884 3885 /** Returns : Returns the ConfigurationManager object for testing purposes. */ 3886 @RestrictedApi(explanation = "Should only be called in tests", link = "", 3887 allowedOnPath = ".*/src/test/.*") 3888 public ConfigurationManager getConfigurationManager() { 3889 return configurationManager; 3890 } 3891 3892 /** Returns Return table descriptors implementation. */ 3893 @Override 3894 public TableDescriptors getTableDescriptors() { 3895 return this.tableDescriptors; 3896 } 3897 3898 /** 3899 * Reload the configuration from disk. 3900 */ 3901 void updateConfiguration() throws IOException { 3902 LOG.info("Reloading the configuration from disk."); 3903 // Reload the configuration from disk. 3904 preUpdateConfiguration(); 3905 conf.reloadConfiguration(); 3906 configurationManager.notifyAllObservers(conf); 3907 postUpdateConfiguration(); 3908 } 3909 3910 protected void preUpdateConfiguration() throws IOException { 3911 if (rsHost != null) { 3912 rsHost.preUpdateConfiguration(conf); 3913 } 3914 } 3915 3916 protected void postUpdateConfiguration() throws IOException { 3917 if (rsHost != null) { 3918 rsHost.postUpdateConfiguration(conf); 3919 } 3920 } 3921 3922 CacheEvictionStats clearRegionBlockCache(Region region) { 3923 long evictedBlocks = 0; 3924 3925 for (Store store : region.getStores()) { 3926 for (StoreFile hFile : store.getStorefiles()) { 3927 evictedBlocks += blockCache.evictBlocksByHfileName(hFile.getPath().getName()); 3928 } 3929 } 3930 3931 return CacheEvictionStats.builder().withEvictedBlocks(evictedBlocks).build(); 3932 } 3933 3934 @Override 3935 public double getCompactionPressure() { 3936 double max = 0; 3937 for (Region region : onlineRegions.values()) { 3938 for (Store store : region.getStores()) { 3939 double normCount = store.getCompactionPressure(); 3940 if (normCount > max) { 3941 max = normCount; 3942 } 3943 } 3944 } 3945 return max; 3946 } 3947 3948 @Override 3949 public HeapMemoryManager getHeapMemoryManager() { 3950 return hMemManager; 3951 } 3952 3953 public MemStoreFlusher getMemStoreFlusher() { 3954 return cacheFlusher; 3955 } 3956 3957 /** 3958 * For testing 3959 * @return whether all wal roll request finished for this regionserver 3960 */ 3961 @InterfaceAudience.Private 3962 public boolean walRollRequestFinished() { 3963 return this.walRoller.walRollFinished(); 3964 } 3965 3966 @Override 3967 public ThroughputController getFlushThroughputController() { 3968 return flushThroughputController; 3969 } 3970 3971 @Override 3972 public double getFlushPressure() { 3973 if (getRegionServerAccounting() == null || cacheFlusher == null) { 3974 // return 0 during RS initialization 3975 return 0.0; 3976 } 3977 return getRegionServerAccounting().getFlushPressure(); 3978 } 3979 3980 @Override 3981 public void onConfigurationChange(Configuration newConf) { 3982 ThroughputController old = this.flushThroughputController; 3983 if (old != null) { 3984 old.stop("configuration change"); 3985 } 3986 this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf); 3987 try { 3988 Superusers.initialize(newConf); 3989 } catch (IOException e) { 3990 LOG.warn("Failed to initialize SuperUsers on reloading of the configuration"); 3991 } 3992 3993 // update region server coprocessor if the configuration has changed. 3994 if ( 3995 CoprocessorConfigurationUtil.checkConfigurationChange(getConfiguration(), newConf, 3996 CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY) 3997 ) { 3998 LOG.info("Update region server coprocessors because the configuration has changed"); 3999 this.rsHost = new RegionServerCoprocessorHost(this, newConf); 4000 } 4001 } 4002 4003 @Override 4004 public MetricsRegionServer getMetrics() { 4005 return metricsRegionServer; 4006 } 4007 4008 @Override 4009 public SecureBulkLoadManager getSecureBulkLoadManager() { 4010 return this.secureBulkLoadManager; 4011 } 4012 4013 @Override 4014 public EntityLock regionLock(final List<RegionInfo> regionInfos, final String description, 4015 final Abortable abort) { 4016 return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) 4017 .regionLock(regionInfos, description, abort); 4018 } 4019 4020 @Override 4021 public void unassign(byte[] regionName) throws IOException { 4022 clusterConnection.getAdmin().unassign(regionName, false); 4023 } 4024 4025 @Override 4026 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { 4027 return this.rsSpaceQuotaManager; 4028 } 4029 4030 @Override 4031 public boolean reportFileArchivalForQuotas(TableName tableName, 4032 Collection<Entry<String, Long>> archivedFiles) { 4033 if (TEST_SKIP_REPORTING_TRANSITION) { 4034 return false; 4035 } 4036 RegionServerStatusService.BlockingInterface rss = rssStub; 4037 if (rss == null || rsSpaceQuotaManager == null) { 4038 // the current server could be stopping. 4039 LOG.trace("Skipping file archival reporting to HMaster as stub is null"); 4040 return false; 4041 } 4042 try { 4043 RegionServerStatusProtos.FileArchiveNotificationRequest request = 4044 rsSpaceQuotaManager.buildFileArchiveRequest(tableName, archivedFiles); 4045 rss.reportFileArchival(null, request); 4046 } catch (ServiceException se) { 4047 IOException ioe = ProtobufUtil.getRemoteException(se); 4048 if (ioe instanceof PleaseHoldException) { 4049 if (LOG.isTraceEnabled()) { 4050 LOG.trace("Failed to report file archival(s) to Master because it is initializing." 4051 + " This will be retried.", ioe); 4052 } 4053 // The Master is coming up. Will retry the report later. Avoid re-creating the stub. 4054 return false; 4055 } 4056 if (rssStub == rss) { 4057 rssStub = null; 4058 } 4059 // re-create the stub if we failed to report the archival 4060 createRegionServerStatusStub(true); 4061 LOG.debug("Failed to report file archival(s) to Master. This will be retried.", ioe); 4062 return false; 4063 } 4064 return true; 4065 } 4066 4067 public NettyEventLoopGroupConfig getEventLoopGroupConfig() { 4068 return eventLoopGroupConfig; 4069 } 4070 4071 @Override 4072 public Connection createConnection(Configuration conf) throws IOException { 4073 User user = UserProvider.instantiate(conf).getCurrent(); 4074 return ServerConnectionUtils.createShortCircuitConnection(conf, user, this.serverName, 4075 this.rpcServices, this.rpcServices, new RegionServerRegistry(this)); 4076 } 4077 4078 void executeProcedure(long procId, RSProcedureCallable callable) { 4079 executorService.submit(new RSProcedureHandler(this, procId, callable)); 4080 } 4081 4082 public void remoteProcedureComplete(long procId, Throwable error) { 4083 procedureResultReporter.complete(procId, error); 4084 } 4085 4086 void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { 4087 RegionServerStatusService.BlockingInterface rss; 4088 // TODO: juggling class state with an instance variable, outside of a synchronized block :'( 4089 for (;;) { 4090 rss = rssStub; 4091 if (rss != null) { 4092 break; 4093 } 4094 createRegionServerStatusStub(); 4095 } 4096 try { 4097 rss.reportProcedureDone(null, request); 4098 } catch (ServiceException se) { 4099 if (rssStub == rss) { 4100 rssStub = null; 4101 } 4102 throw ProtobufUtil.getRemoteException(se); 4103 } 4104 } 4105 4106 /** 4107 * Will ignore the open/close region procedures which already submitted or executed. When master 4108 * had unfinished open/close region procedure and restarted, new active master may send duplicate 4109 * open/close region request to regionserver. The open/close request is submitted to a thread pool 4110 * and execute. So first need a cache for submitted open/close region procedures. After the 4111 * open/close region request executed and report region transition succeed, cache it in executed 4112 * region procedures cache. See {@link #finishRegionProcedure(long)}. After report region 4113 * transition succeed, master will not send the open/close region request to regionserver again. 4114 * And we thought that the ongoing duplicate open/close region request should not be delayed more 4115 * than 600 seconds. So the executed region procedures cache will expire after 600 seconds. See 4116 * HBASE-22404 for more details. 4117 * @param procId the id of the open/close region procedure 4118 * @return true if the procedure can be submitted. 4119 */ 4120 boolean submitRegionProcedure(long procId) { 4121 if (procId == -1) { 4122 return true; 4123 } 4124 // Ignore the region procedures which already submitted. 4125 Long previous = submittedRegionProcedures.putIfAbsent(procId, procId); 4126 if (previous != null) { 4127 LOG.warn("Received procedure pid={}, which already submitted, just ignore it", procId); 4128 return false; 4129 } 4130 // Ignore the region procedures which already executed. 4131 if (executedRegionProcedures.getIfPresent(procId) != null) { 4132 LOG.warn("Received procedure pid={}, which already executed, just ignore it", procId); 4133 return false; 4134 } 4135 return true; 4136 } 4137 4138 /** 4139 * See {@link #submitRegionProcedure(long)}. 4140 * @param procId the id of the open/close region procedure 4141 */ 4142 public void finishRegionProcedure(long procId) { 4143 executedRegionProcedures.put(procId, procId); 4144 submittedRegionProcedures.remove(procId); 4145 } 4146 4147 public boolean isShutDown() { 4148 return shutDown; 4149 } 4150 4151 /** 4152 * Force to terminate region server when abort timeout. 4153 */ 4154 private static class SystemExitWhenAbortTimeout extends TimerTask { 4155 4156 public SystemExitWhenAbortTimeout() { 4157 } 4158 4159 @Override 4160 public void run() { 4161 LOG.warn("Aborting region server timed out, terminating forcibly" 4162 + " and does not wait for any running shutdown hooks or finalizers to finish their work." 4163 + " Thread dump to stdout."); 4164 Threads.printThreadInfo(System.out, "Zombie HRegionServer"); 4165 Runtime.getRuntime().halt(1); 4166 } 4167 } 4168 4169 @InterfaceAudience.Private 4170 public CompactedHFilesDischarger getCompactedHFilesDischarger() { 4171 return compactedFileDischarger; 4172 } 4173 4174 /** 4175 * Return pause time configured in {@link HConstants#HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME}} 4176 * @return pause time 4177 */ 4178 @InterfaceAudience.Private 4179 public long getRetryPauseTime() { 4180 return this.retryPauseTime; 4181 } 4182 4183 public Optional<ServerName> getActiveMaster() { 4184 return Optional.ofNullable(masterAddressTracker.getMasterAddress()); 4185 } 4186 4187 public List<ServerName> getBackupMasters() { 4188 return masterAddressTracker.getBackupMasters(); 4189 } 4190 4191 public Iterator<ServerName> getBootstrapNodes() { 4192 return bootstrapNodeManager.getBootstrapNodes().iterator(); 4193 } 4194 4195 public MetaRegionLocationCache getMetaRegionLocationCache() { 4196 return this.metaRegionLocationCache; 4197 } 4198 4199 @InterfaceAudience.Private 4200 public BrokenStoreFileCleaner getBrokenStoreFileCleaner() { 4201 return brokenStoreFileCleaner; 4202 } 4203 4204 @InterfaceAudience.Private 4205 public RSMobFileCleanerChore getRSMobFileCleanerChore() { 4206 return rsMobFileCleanerChore; 4207 } 4208 4209 RSSnapshotVerifier getRsSnapshotVerifier() { 4210 return rsSnapshotVerifier; 4211 } 4212}