001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.net.InetAddress;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.Objects;
031import java.util.Set;
032import java.util.concurrent.ConcurrentNavigableMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.CopyOnWriteArrayList;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.function.Predicate;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.ClockOutOfSyncException;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.NotServingRegionException;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerMetricsBuilder;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.YouAreDeadException;
046import org.apache.hadoop.hbase.client.ClusterConnection;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.conf.ConfigurationObserver;
049import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
050import org.apache.hadoop.hbase.ipc.HBaseRpcController;
051import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
052import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
053import org.apache.hadoop.hbase.monitoring.MonitoredTask;
054import org.apache.hadoop.hbase.procedure2.Procedure;
055import org.apache.hadoop.hbase.regionserver.HRegionServer;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.zookeeper.ZKUtil;
059import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
060import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
061import org.apache.yetus.audience.InterfaceAudience;
062import org.apache.zookeeper.KeeperException;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
073
074/**
075 * The ServerManager class manages info about region servers.
076 * <p>
077 * Maintains lists of online and dead servers. Processes the startups, shutdowns, and deaths of
078 * region servers.
079 * <p>
080 * Servers are distinguished in two different ways. A given server has a location, specified by
081 * hostname and port, and of which there can only be one online at any given time. A server instance
082 * is specified by the location (hostname and port) as well as the startcode (timestamp from when
083 * the server was started). This is used to differentiate a restarted instance of a given server
084 * from the original instance.
085 * <p>
086 * If a sever is known not to be running any more, it is called dead. The dead server needs to be
087 * handled by a ServerShutdownHandler. If the handler is not enabled yet, the server can't be
088 * handled right away so it is queued up. After the handler is enabled, the server will be submitted
089 * to a handler to handle. However, the handler may be just partially enabled. If so, the server
090 * cannot be fully processed, and be queued up for further processing. A server is fully processed
091 * only after the handler is fully enabled and has completed the handling.
092 */
093@InterfaceAudience.Private
094public class ServerManager implements ConfigurationObserver {
095  public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
096    "hbase.master.wait.on.regionservers.maxtostart";
097
098  public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
099    "hbase.master.wait.on.regionservers.mintostart";
100
101  public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
102    "hbase.master.wait.on.regionservers.timeout";
103
104  public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
105    "hbase.master.wait.on.regionservers.interval";
106
107  private static final Logger LOG = LoggerFactory.getLogger(ServerManager.class);
108
109  // Set if we are to shutdown the cluster.
110  private AtomicBoolean clusterShutdown = new AtomicBoolean(false);
111
112  /**
113   * The last flushed sequence id for a region.
114   */
115  private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
116    new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
117
118  /**
119   * The last flushed sequence id for a store in a region.
120   */
121  private final ConcurrentNavigableMap<byte[],
122    ConcurrentNavigableMap<byte[], Long>> storeFlushedSequenceIdsByRegion =
123      new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
124
125  /** Map of registered servers to their current load */
126  private final ConcurrentNavigableMap<ServerName, ServerMetrics> onlineServers =
127    new ConcurrentSkipListMap<>();
128
129  /** List of region servers that should not get any more new regions. */
130  private final ArrayList<ServerName> drainingServers = new ArrayList<>();
131
132  private final MasterServices master;
133  private final ClusterConnection connection;
134  private final RegionServerList storage;
135
136  private final DeadServer deadservers = new DeadServer();
137
138  private final long maxSkew;
139  private final long warningSkew;
140
141  private final RpcControllerFactory rpcControllerFactory;
142
143  /** Listeners that are called on server events. */
144  private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
145
146  /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
147  private volatile boolean rejectDecommissionedHostsConfig;
148
149  /**
150   * Constructor.
151   */
152  public ServerManager(final MasterServices master, RegionServerList storage) {
153    this.master = master;
154    this.storage = storage;
155    Configuration c = master.getConfiguration();
156    maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
157    warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
158    this.connection = master.getClusterConnection();
159    this.rpcControllerFactory =
160      this.connection == null ? null : connection.getRpcControllerFactory();
161    rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
162  }
163
164  /**
165   * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
166   * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
167   * @param conf Server configuration instance
168   */
169  @Override
170  public void onConfigurationChange(Configuration conf) {
171    final boolean newValue = getRejectDecommissionedHostsConfig(conf);
172    if (rejectDecommissionedHostsConfig == newValue) {
173      // no-op
174      return;
175    }
176
177    LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
178      rejectDecommissionedHostsConfig, newValue);
179
180    rejectDecommissionedHostsConfig = newValue;
181  }
182
183  /**
184   * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
185   * @param conf Configuration instance of the Master
186   */
187  public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
188    return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
189      HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
190  }
191
192  /**
193   * Add the listener to the notification list.
194   * @param listener The ServerListener to register
195   */
196  public void registerListener(final ServerListener listener) {
197    this.listeners.add(listener);
198  }
199
200  /**
201   * Remove the listener from the notification list.
202   * @param listener The ServerListener to unregister
203   */
204  public boolean unregisterListener(final ServerListener listener) {
205    return this.listeners.remove(listener);
206  }
207
208  /**
209   * Let the server manager know a new regionserver has come online
210   * @param request       the startup request
211   * @param versionNumber the version number of the new regionserver
212   * @param version       the version of the new regionserver, could contain strings like "SNAPSHOT"
213   * @param ia            the InetAddress from which request is received
214   * @return The ServerName we know this server as.
215   */
216  ServerName regionServerStartup(RegionServerStartupRequest request, int versionNumber,
217    String version, InetAddress ia) throws IOException {
218    // Test for case where we get a region startup message from a regionserver
219    // that has been quickly restarted but whose znode expiration handler has
220    // not yet run, or from a server whose fail we are currently processing.
221    // Test its host+port combo is present in serverAddressToServerInfo. If it
222    // is, reject the server and trigger its expiration. The next time it comes
223    // in, it should have been removed from serverAddressToServerInfo and queued
224    // for processing by ProcessServerShutdown.
225
226    // if use-ip is enabled, we will use ip to expose Master/RS service for client,
227    // see HBASE-27304 for details.
228    boolean useIp = master.getConfiguration().getBoolean(HConstants.HBASE_SERVER_USEIP_ENABLED_KEY,
229      HConstants.HBASE_SERVER_USEIP_ENABLED_DEFAULT);
230    String isaHostName = useIp ? ia.getHostAddress() : ia.getHostName();
231    final String hostname =
232      request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
233    ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
234
235    // Check if the host should be rejected based on it's decommissioned status
236    checkRejectableDecommissionedStatus(sn);
237
238    checkClockSkew(sn, request.getServerCurrentTime());
239    checkIsDead(sn, "STARTUP");
240    if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
241      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
242    }
243    storage.started(sn);
244    return sn;
245  }
246
247  /**
248   * Updates last flushed sequence Ids for the regions on server sn
249   */
250  private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
251    for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
252      byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
253      Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
254      long l = entry.getValue().getCompletedSequenceId();
255      // Don't let smaller sequence ids override greater sequence ids.
256      if (LOG.isTraceEnabled()) {
257        LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue
258          + ", completeSequenceId=" + l);
259      }
260      if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
261        flushedSequenceIdByRegion.put(encodedRegionName, l);
262      } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
263        LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id (" + l
264          + ") that is less than the previous last flushed sequence id (" + existingValue
265          + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
266      }
267      ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
268        computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
269          () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
270      for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
271        byte[] family = storeSeqId.getKey();
272        existingValue = storeFlushedSequenceId.get(family);
273        l = storeSeqId.getValue();
274        if (LOG.isTraceEnabled()) {
275          LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family)
276            + ", existingValue=" + existingValue + ", completeSequenceId=" + l);
277        }
278        // Don't let smaller sequence ids override greater sequence ids.
279        if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
280          storeFlushedSequenceId.put(family, l);
281        }
282      }
283    }
284  }
285
286  public void regionServerReport(ServerName sn, ServerMetrics sl) throws YouAreDeadException {
287    checkIsDead(sn, "REPORT");
288    if (null == this.onlineServers.replace(sn, sl)) {
289      // Already have this host+port combo and its just different start code?
290      // Just let the server in. Presume master joining a running cluster.
291      // recordNewServer is what happens at the end of reportServerStartup.
292      // The only thing we are skipping is passing back to the regionserver
293      // the ServerName to use. Here we presume a master has already done
294      // that so we'll press on with whatever it gave us for ServerName.
295      if (!checkAndRecordNewServer(sn, sl)) {
296        // Master already registered server with same (host + port) and higher startcode.
297        // This can happen if regionserver report comes late from old server
298        // (possible race condition), by that time master has already processed SCP for that
299        // server and started accepting regionserver report from new server i.e. server with
300        // same (host + port) and higher startcode.
301        // The exception thrown here is not meant to tell the region server it is dead because if
302        // there is a new server on the same host port, the old server should have already been
303        // dead in ideal situation.
304        // The exception thrown here is to skip the later steps of the whole regionServerReport
305        // request processing. Usually, after recording it in ServerManager, we will call the
306        // related methods in AssignmentManager to record region states. If the region server
307        // is already dead, we should not do these steps anymore, so here we throw an exception
308        // to let the upper layer know that they should not continue processing anymore.
309        final String errorMsg = "RegionServerReport received from " + sn
310          + ", but another server with the same name and higher startcode is already registered,"
311          + " ignoring";
312        LOG.warn(errorMsg);
313        throw new YouAreDeadException(errorMsg);
314      }
315    }
316    updateLastFlushedSequenceIds(sn, sl);
317  }
318
319  /**
320   * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
321   * to do so, any RegionServer trying to join the cluster will have it's host checked against the
322   * list of hosts of currently decommissioned servers and potentially get prevented from reporting
323   * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
324   * details.
325   * @param sn The ServerName to check for
326   * @throws DecommissionedHostRejectedException if the Master is configured to reject
327   *                                             decommissioned hosts and this host exists in the
328   *                                             list of the decommissioned servers
329   */
330  private void checkRejectableDecommissionedStatus(ServerName sn)
331    throws DecommissionedHostRejectedException {
332    LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());
333
334    // If the Master is not configured to reject decommissioned hosts, return early.
335    if (!rejectDecommissionedHostsConfig) {
336      return;
337    }
338
339    // Look for a match for the hostname in the list of decommissioned servers
340    for (ServerName server : getDrainingServersList()) {
341      if (Objects.equals(server.getHostname(), sn.getHostname())) {
342        // Found a match and master is configured to reject decommissioned hosts, throw exception!
343        LOG.warn(
344          "Rejecting RegionServer {} from reporting for duty because Master is configured "
345            + "to reject decommissioned hosts and this host was marked as such in the past.",
346          sn.getServerName());
347        throw new DecommissionedHostRejectedException(String.format(
348          "Host %s exists in the list of decommissioned servers and Master is configured to "
349            + "reject decommissioned hosts",
350          sn.getHostname()));
351      }
352    }
353  }
354
355  /**
356   * Check is a server of same host and port already exists, if not, or the existed one got a
357   * smaller start code, record it.
358   * @param serverName the server to check and record
359   * @param sl         the server load on the server
360   * @return true if the server is recorded, otherwise, false
361   */
362  boolean checkAndRecordNewServer(final ServerName serverName, final ServerMetrics sl) {
363    ServerName existingServer = null;
364    synchronized (this.onlineServers) {
365      existingServer = findServerWithSameHostnamePortWithLock(serverName);
366      if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
367        LOG.info("Server serverName=" + serverName + " rejected; we already have "
368          + existingServer.toString() + " registered with same hostname and port");
369        return false;
370      }
371      recordNewServerWithLock(serverName, sl);
372    }
373
374    // Tell our listeners that a server was added
375    if (!this.listeners.isEmpty()) {
376      for (ServerListener listener : this.listeners) {
377        listener.serverAdded(serverName);
378      }
379    }
380
381    // Note that we assume that same ts means same server, and don't expire in that case.
382    // TODO: ts can theoretically collide due to clock shifts, so this is a bit hacky.
383    if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
384      LOG.info("Triggering server recovery; existingServer " + existingServer
385        + " looks stale, new server:" + serverName);
386      expireServer(existingServer);
387    }
388    return true;
389  }
390
391  /**
392   * Find out the region servers crashed between the crash of the previous master instance and the
393   * current master instance and schedule SCP for them.
394   * <p/>
395   * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
396   * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
397   * to find out whether there are servers which are already dead.
398   * <p/>
399   * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
400   * concurrency issue.
401   * @param deadServersFromPE     the region servers which already have a SCP associated.
402   * @param liveServersFromWALDir the live region servers from wal directory.
403   */
404  void findDeadServersAndProcess(Set<ServerName> deadServersFromPE,
405    Set<ServerName> liveServersFromWALDir) {
406    deadServersFromPE.forEach(deadservers::putIfAbsent);
407    liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
408      .forEach(this::expireServer);
409  }
410
411  /**
412   * Checks if the clock skew between the server and the master. If the clock skew exceeds the
413   * configured max, it will throw an exception; if it exceeds the configured warning threshold, it
414   * will log a warning but start normally.
415   * @param serverName Incoming servers's name
416   * @throws ClockOutOfSyncException if the skew exceeds the configured max value
417   */
418  private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
419    throws ClockOutOfSyncException {
420    long skew = Math.abs(EnvironmentEdgeManager.currentTime() - serverCurrentTime);
421    if (skew > maxSkew) {
422      String message = "Server " + serverName + " has been "
423        + "rejected; Reported time is too far out of sync with master.  " + "Time difference of "
424        + skew + "ms > max allowed of " + maxSkew + "ms";
425      LOG.warn(message);
426      throw new ClockOutOfSyncException(message);
427    } else if (skew > warningSkew) {
428      String message = "Reported time for server " + serverName + " is out of sync with master "
429        + "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " + "error threshold is "
430        + maxSkew + "ms)";
431      LOG.warn(message);
432    }
433  }
434
435  /**
436   * Called when RegionServer first reports in for duty and thereafter each time it heartbeats to
437   * make sure it is has not been figured for dead. If this server is on the dead list, reject it
438   * with a YouAreDeadException. If it was dead but came back with a new start code, remove the old
439   * entry from the dead list.
440   * @param what START or REPORT
441   */
442  private void checkIsDead(final ServerName serverName, final String what)
443    throws YouAreDeadException {
444    if (this.deadservers.isDeadServer(serverName)) {
445      // Exact match: host name, port and start code all match with existing one of the
446      // dead servers. So, this server must be dead. Tell it to kill itself.
447      String message =
448        "Server " + what + " rejected; currently processing " + serverName + " as dead server";
449      LOG.debug(message);
450      throw new YouAreDeadException(message);
451    }
452    // Remove dead server with same hostname and port of newly checking in rs after master
453    // initialization. See HBASE-5916 for more information.
454    if (
455      (this.master == null || this.master.isInitialized())
456        && this.deadservers.cleanPreviousInstance(serverName)
457    ) {
458      // This server has now become alive after we marked it as dead.
459      // We removed it's previous entry from the dead list to reflect it.
460      LOG.debug("{} {} came back up, removed it from the dead servers list", what, serverName);
461    }
462  }
463
464  /**
465   * Assumes onlineServers is locked.
466   * @return ServerName with matching hostname and port.
467   */
468  public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
469    ServerName end =
470      ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
471
472    ServerName r = onlineServers.lowerKey(end);
473    if (r != null) {
474      if (ServerName.isSameAddress(r, serverName)) {
475        return r;
476      }
477    }
478    return null;
479  }
480
481  /**
482   * Adds the onlineServers list. onlineServers should be locked.
483   * @param serverName The remote servers name.
484   */
485  void recordNewServerWithLock(final ServerName serverName, final ServerMetrics sl) {
486    LOG.info("Registering regionserver=" + serverName);
487    this.onlineServers.put(serverName, sl);
488    master.getAssignmentManager().getRegionStates().createServer(serverName);
489  }
490
491  public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
492    RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
493    Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
494    builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
495    Map<byte[], Long> storeFlushedSequenceId =
496      storeFlushedSequenceIdsByRegion.get(encodedRegionName);
497    if (storeFlushedSequenceId != null) {
498      for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
499        builder.addStoreSequenceId(StoreSequenceId.newBuilder()
500          .setFamilyName(UnsafeByteOperations.unsafeWrap(entry.getKey()))
501          .setSequenceId(entry.getValue().longValue()).build());
502      }
503    }
504    return builder.build();
505  }
506
507  /** Returns ServerMetrics if serverName is known else null */
508  public ServerMetrics getLoad(final ServerName serverName) {
509    return this.onlineServers.get(serverName);
510  }
511
512  /**
513   * Compute the average load across all region servers. Currently, this uses a very naive
514   * computation - just uses the number of regions being served, ignoring stats about number of
515   * requests.
516   * @return the average load
517   */
518  public double getAverageLoad() {
519    int totalLoad = 0;
520    int numServers = 0;
521    for (ServerMetrics sl : this.onlineServers.values()) {
522      numServers++;
523      totalLoad += sl.getRegionMetrics().size();
524    }
525    return numServers == 0 ? 0 : (double) totalLoad / (double) numServers;
526  }
527
528  /** Returns the count of active regionservers */
529  public int countOfRegionServers() {
530    // Presumes onlineServers is a concurrent map
531    return this.onlineServers.size();
532  }
533
534  /** Returns Read-only map of servers to serverinfo */
535  public Map<ServerName, ServerMetrics> getOnlineServers() {
536    // Presumption is that iterating the returned Map is OK.
537    synchronized (this.onlineServers) {
538      return Collections.unmodifiableMap(this.onlineServers);
539    }
540  }
541
542  public DeadServer getDeadServers() {
543    return this.deadservers;
544  }
545
546  /**
547   * Checks if any dead servers are currently in progress.
548   * @return true if any RS are being processed as dead, false if not
549   */
550  public boolean areDeadServersInProgress() {
551    return this.deadservers.areDeadServersInProgress();
552  }
553
554  void letRegionServersShutdown() {
555    long previousLogTime = 0;
556    ServerName sn = master.getServerName();
557    ZKWatcher zkw = master.getZooKeeper();
558    int onlineServersCt;
559    while ((onlineServersCt = onlineServers.size()) > 0) {
560      if (EnvironmentEdgeManager.currentTime() > (previousLogTime + 1000)) {
561        Set<ServerName> remainingServers = onlineServers.keySet();
562        synchronized (onlineServers) {
563          if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
564            // Master will delete itself later.
565            return;
566          }
567        }
568        StringBuilder sb = new StringBuilder();
569        // It's ok here to not sync on onlineServers - merely logging
570        for (ServerName key : remainingServers) {
571          if (sb.length() > 0) {
572            sb.append(", ");
573          }
574          sb.append(key);
575        }
576        LOG.info("Waiting on regionserver(s) " + sb.toString());
577        previousLogTime = EnvironmentEdgeManager.currentTime();
578      }
579
580      try {
581        List<String> servers = getRegionServersInZK(zkw);
582        if (
583          servers == null || servers.isEmpty()
584            || (servers.size() == 1 && servers.contains(sn.toString()))
585        ) {
586          LOG.info("ZK shows there is only the master self online, exiting now");
587          // Master could have lost some ZK events, no need to wait more.
588          break;
589        }
590      } catch (KeeperException ke) {
591        LOG.warn("Failed to list regionservers", ke);
592        // ZK is malfunctioning, don't hang here
593        break;
594      }
595      synchronized (onlineServers) {
596        try {
597          if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
598        } catch (InterruptedException ignored) {
599          // continue
600        }
601      }
602    }
603  }
604
605  private List<String> getRegionServersInZK(final ZKWatcher zkw) throws KeeperException {
606    return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
607  }
608
609  /**
610   * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
611   * @return pid if we queued a ServerCrashProcedure else {@link Procedure#NO_PROC_ID} if we did not
612   *         (could happen for many reasons including the fact that its this server that is going
613   *         down or we already have queued an SCP for this server or SCP processing is currently
614   *         disabled because we are in startup phase).
615   */
616  // Redo test so we can make this protected.
617  public synchronized long expireServer(final ServerName serverName) {
618    return expireServer(serverName, false);
619
620  }
621
622  synchronized long expireServer(final ServerName serverName, boolean force) {
623    // THIS server is going down... can't handle our own expiration.
624    if (serverName.equals(master.getServerName())) {
625      if (!(master.isAborted() || master.isStopped())) {
626        master.stop("We lost our znode?");
627      }
628      return Procedure.NO_PROC_ID;
629    }
630    if (this.deadservers.isDeadServer(serverName)) {
631      LOG.warn("Expiration called on {} but already in DeadServer", serverName);
632      return Procedure.NO_PROC_ID;
633    }
634    moveFromOnlineToDeadServers(serverName);
635
636    // If server is in draining mode, remove corresponding znode
637    // In some tests, the mocked HM may not have ZK Instance, hence null check
638    if (master.getZooKeeper() != null) {
639      String drainingZnode = ZNodePaths
640        .joinZNode(master.getZooKeeper().getZNodePaths().drainingZNode, serverName.getServerName());
641      try {
642        ZKUtil.deleteNodeFailSilent(master.getZooKeeper(), drainingZnode);
643      } catch (KeeperException e) {
644        LOG.warn(
645          "Error deleting the draining znode for stopping server " + serverName.getServerName(), e);
646      }
647    }
648
649    // If cluster is going down, yes, servers are going to be expiring; don't
650    // process as a dead server
651    if (isClusterShutdown()) {
652      LOG.info("Cluster shutdown set; " + serverName + " expired; onlineServers="
653        + this.onlineServers.size());
654      if (this.onlineServers.isEmpty()) {
655        master.stop("Cluster shutdown set; onlineServer=0");
656      }
657      return Procedure.NO_PROC_ID;
658    }
659    LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName());
660    long pid = master.getAssignmentManager().submitServerCrash(serverName, true, force);
661    if (pid == Procedure.NO_PROC_ID) {
662      // skip later processing as we failed to submit SCP
663      return Procedure.NO_PROC_ID;
664    }
665    storage.expired(serverName);
666    // Tell our listeners that a server was removed
667    if (!this.listeners.isEmpty()) {
668      this.listeners.stream().forEach(l -> l.serverRemoved(serverName));
669    }
670    return pid;
671  }
672
673  /**
674   * Called when server has expired.
675   */
676  // Locking in this class needs cleanup.
677  public synchronized void moveFromOnlineToDeadServers(final ServerName sn) {
678    synchronized (this.onlineServers) {
679      boolean online = this.onlineServers.containsKey(sn);
680      if (online) {
681        // Remove the server from the known servers lists and update load info BUT
682        // add to deadservers first; do this so it'll show in dead servers list if
683        // not in online servers list.
684        this.deadservers.putIfAbsent(sn);
685        this.onlineServers.remove(sn);
686        onlineServers.notifyAll();
687      } else {
688        // If not online, that is odd but may happen if 'Unknown Servers' -- where meta
689        // has references to servers not online nor in dead servers list. If
690        // 'Unknown Server', don't add to DeadServers else will be there for ever.
691        LOG.trace("Expiration of {} but server not online", sn);
692      }
693    }
694  }
695
696  /*
697   * Remove the server from the drain list.
698   */
699  public synchronized boolean removeServerFromDrainList(final ServerName sn) {
700    LOG.info("Removing server {} from the draining list.", sn);
701
702    // Remove the server from the draining servers lists.
703    return this.drainingServers.remove(sn);
704  }
705
706  /**
707   * Add the server to the drain list.
708   * @return True if the server is added or the server is already on the drain list.
709   */
710  public synchronized boolean addServerToDrainList(final ServerName sn) {
711    // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
712    // However, we want to add servers even if they're not online if the master is configured
713    // to reject decommissioned hosts
714    if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
715      LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
716        sn);
717      return false;
718    }
719
720    // Add the server to the draining servers lists, if it's not already in it.
721    if (this.drainingServers.contains(sn)) {
722      LOG.warn(
723        "Server {} is already in the draining server list. Ignoring request to add it again.", sn);
724      return true;
725    }
726
727    LOG.info("Server {} added to draining server list.", sn);
728    return this.drainingServers.add(sn);
729  }
730
731  // RPC methods to region servers
732
733  private HBaseRpcController newRpcController() {
734    return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
735  }
736
737  /**
738   * Sends a WARMUP RPC to the specified server to warmup the specified region.
739   * <p>
740   * A region server could reject the close request because it either does not have the specified
741   * region or the region is being split.
742   * @param server server to warmup a region
743   * @param region region to warmup
744   */
745  public void sendRegionWarmup(ServerName server, RegionInfo region) {
746    if (server == null) return;
747    try {
748      AdminService.BlockingInterface admin = getRsAdmin(server);
749      HBaseRpcController controller = newRpcController();
750      ProtobufUtil.warmupRegion(controller, admin, region);
751    } catch (IOException e) {
752      LOG.error("Received exception in RPC for warmup server:" + server + "region: " + region
753        + "exception: " + e);
754    }
755  }
756
757  /**
758   * Contacts a region server and waits up to timeout ms to close the region. This bypasses the
759   * active hmaster. Pass -1 as timeout if you do not want to wait on result.
760   */
761  public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server,
762    RegionInfo region, long timeout) throws IOException, InterruptedException {
763    AdminService.BlockingInterface rs = connection.getAdmin(server);
764    HBaseRpcController controller = connection.getRpcControllerFactory().newController();
765    try {
766      ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName());
767    } catch (IOException e) {
768      LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
769    }
770    if (timeout < 0) {
771      return;
772    }
773    long expiration = timeout + EnvironmentEdgeManager.currentTime();
774    while (EnvironmentEdgeManager.currentTime() < expiration) {
775      controller.reset();
776      try {
777        RegionInfo rsRegion = ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
778        if (rsRegion == null) return;
779      } catch (IOException ioe) {
780        if (
781          ioe instanceof NotServingRegionException
782            || (ioe instanceof RemoteWithExtrasException && ((RemoteWithExtrasException) ioe)
783              .unwrapRemoteException() instanceof NotServingRegionException)
784        ) {
785          // no need to retry again
786          return;
787        }
788        LOG.warn("Exception when retrieving regioninfo from: " + region.getRegionNameAsString(),
789          ioe);
790      }
791      Thread.sleep(1000);
792    }
793    throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout);
794  }
795
796  /**
797   * @return Admin interface for the remote regionserver named <code>sn</code>
798   * @throws RetriesExhaustedException wrapping a ConnectException if failed
799   */
800  public AdminService.BlockingInterface getRsAdmin(final ServerName sn) throws IOException {
801    LOG.debug("New admin connection to {}", sn);
802    if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
803      // A master is also a region server now, see HBASE-10569 for details
804      return ((HRegionServer) master).getRSRpcServices();
805    } else {
806      return this.connection.getAdmin(sn);
807    }
808  }
809
810  /**
811   * Calculate min necessary to start. This is not an absolute. It is just a friction that will
812   * cause us hang around a bit longer waiting on RegionServers to check-in.
813   */
814  private int getMinToStart() {
815    if (master.isInMaintenanceMode()) {
816      // If in maintenance mode, then master hosting meta will be the only server available
817      return 1;
818    }
819
820    int minimumRequired = 1;
821    if (
822      LoadBalancer.isTablesOnMaster(master.getConfiguration())
823        && LoadBalancer.isSystemTablesOnlyOnMaster(master.getConfiguration())
824    ) {
825      // If Master is carrying regions it will show up as a 'server', but is not handling user-
826      // space regions, so we need a second server.
827      minimumRequired = 2;
828    }
829
830    int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
831    // Ensure we are never less than minimumRequired else stuff won't work.
832    return Math.max(minToStart, minimumRequired);
833  }
834
835  /**
836   * Wait for the region servers to report in. We will wait until one of this condition is met: -
837   * the master is stopped - the 'hbase.master.wait.on.regionservers.maxtostart' number of region
838   * servers is reached - the 'hbase.master.wait.on.regionservers.mintostart' is reached AND there
839   * have been no new region server in for 'hbase.master.wait.on.regionservers.interval' time AND
840   * the 'hbase.master.wait.on.regionservers.timeout' is reached
841   */
842  public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
843    final long interval =
844      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
845    final long timeout =
846      this.master.getConfiguration().getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
847    // Min is not an absolute; just a friction making us wait longer on server checkin.
848    int minToStart = getMinToStart();
849    int maxToStart =
850      this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
851    if (maxToStart < minToStart) {
852      LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
853        WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart, WAIT_ON_REGIONSERVERS_MINTOSTART,
854        minToStart));
855      maxToStart = Integer.MAX_VALUE;
856    }
857
858    long now = EnvironmentEdgeManager.currentTime();
859    final long startTime = now;
860    long slept = 0;
861    long lastLogTime = 0;
862    long lastCountChange = startTime;
863    int count = countOfRegionServers();
864    int oldCount = 0;
865    // This while test is a little hard to read. We try to comment it in below but in essence:
866    // Wait if Master is not stopped and the number of regionservers that have checked-in is
867    // less than the maxToStart. Both of these conditions will be true near universally.
868    // Next, we will keep cycling if ANY of the following three conditions are true:
869    // 1. The time since a regionserver registered is < interval (means servers are actively
870    // checking in).
871    // 2. We are under the total timeout.
872    // 3. The count of servers is < minimum.
873    for (ServerListener listener : this.listeners) {
874      listener.waiting();
875    }
876    while (
877      !this.master.isStopped() && !isClusterShutdown() && count < maxToStart
878        && ((lastCountChange + interval) > now || timeout > slept || count < minToStart)
879    ) {
880      // Log some info at every interval time or if there is a change
881      if (oldCount != count || lastLogTime + interval < now) {
882        lastLogTime = now;
883        String msg =
884          "Waiting on regionserver count=" + count + "; waited=" + slept + "ms, expecting min="
885            + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s), "
886            + "timeout=" + timeout + "ms, lastChange=" + (now - lastCountChange) + "ms";
887        LOG.info(msg);
888        status.setStatus(msg);
889      }
890
891      // We sleep for some time
892      final long sleepTime = 50;
893      Thread.sleep(sleepTime);
894      now = EnvironmentEdgeManager.currentTime();
895      slept = now - startTime;
896
897      oldCount = count;
898      count = countOfRegionServers();
899      if (count != oldCount) {
900        lastCountChange = now;
901      }
902    }
903    // Did we exit the loop because cluster is going down?
904    if (isClusterShutdown()) {
905      this.master.stop("Cluster shutdown");
906    }
907    LOG.info("Finished waiting on RegionServer count=" + count + "; waited=" + slept + "ms,"
908      + " expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart)
909      + " server(s)," + " master is " + (this.master.isStopped() ? "stopped." : "running"));
910  }
911
912  private String getStrForMax(final int max) {
913    return max == Integer.MAX_VALUE ? "NO_LIMIT" : Integer.toString(max);
914  }
915
916  /** Returns A copy of the internal list of online servers. */
917  public List<ServerName> getOnlineServersList() {
918    // TODO: optimize the load balancer call so we don't need to make a new list
919    // TODO: FIX. THIS IS POPULAR CALL.
920    return new ArrayList<>(this.onlineServers.keySet());
921  }
922
923  /**
924   * @param keys                 The target server name
925   * @param idleServerPredicator Evaluates the server on the given load
926   * @return A copy of the internal list of online servers matched by the predicator
927   */
928  public List<ServerName> getOnlineServersListWithPredicator(List<ServerName> keys,
929    Predicate<ServerMetrics> idleServerPredicator) {
930    List<ServerName> names = new ArrayList<>();
931    if (keys != null && idleServerPredicator != null) {
932      keys.forEach(name -> {
933        ServerMetrics load = onlineServers.get(name);
934        if (load != null) {
935          if (idleServerPredicator.test(load)) {
936            names.add(name);
937          }
938        }
939      });
940    }
941    return names;
942  }
943
944  /** Returns A copy of the internal list of draining servers. */
945  public List<ServerName> getDrainingServersList() {
946    return new ArrayList<>(this.drainingServers);
947  }
948
949  public boolean isServerOnline(ServerName serverName) {
950    return serverName != null && onlineServers.containsKey(serverName);
951  }
952
953  public enum ServerLiveState {
954    LIVE,
955    DEAD,
956    UNKNOWN
957  }
958
959  /** Returns whether the server is online, dead, or unknown. */
960  public synchronized ServerLiveState isServerKnownAndOnline(ServerName serverName) {
961    return onlineServers.containsKey(serverName)
962      ? ServerLiveState.LIVE
963      : (deadservers.isDeadServer(serverName) ? ServerLiveState.DEAD : ServerLiveState.UNKNOWN);
964  }
965
966  /**
967   * Check if a server is known to be dead. A server can be online, or known to be dead, or unknown
968   * to this manager (i.e, not online, not known to be dead either; it is simply not tracked by the
969   * master any more, for example, a very old previous instance).
970   */
971  public synchronized boolean isServerDead(ServerName serverName) {
972    return serverName == null || deadservers.isDeadServer(serverName);
973  }
974
975  /**
976   * Check if a server is unknown. A server can be online, or known to be dead, or unknown to this
977   * manager (i.e, not online, not known to be dead either; it is simply not tracked by the master
978   * any more, for example, a very old previous instance).
979   */
980  public boolean isServerUnknown(ServerName serverName) {
981    return serverName == null
982      || (!onlineServers.containsKey(serverName) && !deadservers.isDeadServer(serverName));
983  }
984
985  public void shutdownCluster() {
986    String statusStr = "Cluster shutdown requested of master=" + this.master.getServerName();
987    LOG.info(statusStr);
988    this.clusterShutdown.set(true);
989    if (onlineServers.isEmpty()) {
990      // we do not synchronize here so this may cause a double stop, but not a big deal
991      master.stop("OnlineServer=0 right after cluster shutdown set");
992    }
993  }
994
995  public boolean isClusterShutdown() {
996    return this.clusterShutdown.get();
997  }
998
999  /**
1000   * Stop the ServerManager.
1001   */
1002  public void stop() {
1003    // Nothing to do.
1004  }
1005
1006  /**
1007   * Creates a list of possible destinations for a region. It contains the online servers, but not
1008   * the draining or dying servers.
1009   * @param serversToExclude can be null if there is no server to exclude
1010   */
1011  public List<ServerName> createDestinationServersList(final List<ServerName> serversToExclude) {
1012    Set<ServerName> destServers = new HashSet<>();
1013    onlineServers.forEach((sn, sm) -> {
1014      if (sm.getLastReportTimestamp() > 0) {
1015        // This means we have already called regionServerReport at leaset once, then let's include
1016        // this server for region assignment. This is an optimization to avoid assigning regions to
1017        // an uninitialized server. See HBASE-25032 for more details.
1018        destServers.add(sn);
1019      }
1020    });
1021
1022    if (serversToExclude != null) {
1023      destServers.removeAll(serversToExclude);
1024    }
1025
1026    // Loop through the draining server list and remove them from the server list
1027    final List<ServerName> drainingServersCopy = getDrainingServersList();
1028    destServers.removeAll(drainingServersCopy);
1029
1030    return new ArrayList<>(destServers);
1031  }
1032
1033  /**
1034   * Calls {@link #createDestinationServersList} without server to exclude.
1035   */
1036  public List<ServerName> createDestinationServersList() {
1037    return createDestinationServersList(null);
1038  }
1039
1040  /**
1041   * To clear any dead server with same host name and port of any online server
1042   */
1043  void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1044    for (ServerName serverName : getOnlineServersList()) {
1045      deadservers.cleanAllPreviousInstances(serverName);
1046    }
1047  }
1048
1049  /**
1050   * Called by delete table and similar to notify the ServerManager that a region was removed.
1051   */
1052  public void removeRegion(final RegionInfo regionInfo) {
1053    final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1054    storeFlushedSequenceIdsByRegion.remove(encodedName);
1055    flushedSequenceIdByRegion.remove(encodedName);
1056  }
1057
1058  public boolean isRegionInServerManagerStates(final RegionInfo hri) {
1059    final byte[] encodedName = hri.getEncodedNameAsBytes();
1060    return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1061      || flushedSequenceIdByRegion.containsKey(encodedName));
1062  }
1063
1064  /**
1065   * Called by delete table and similar to notify the ServerManager that a region was removed.
1066   */
1067  public void removeRegions(final List<RegionInfo> regions) {
1068    for (RegionInfo hri : regions) {
1069      removeRegion(hri);
1070    }
1071  }
1072
1073  /**
1074   * May return 0 when server is not online.
1075   */
1076  public int getVersionNumber(ServerName serverName) {
1077    ServerMetrics serverMetrics = onlineServers.get(serverName);
1078    return serverMetrics != null ? serverMetrics.getVersionNumber() : 0;
1079  }
1080
1081  /**
1082   * May return "0.0.0" when server is not online
1083   */
1084  public String getVersion(ServerName serverName) {
1085    ServerMetrics serverMetrics = onlineServers.get(serverName);
1086    return serverMetrics != null ? serverMetrics.getVersion() : "0.0.0";
1087  }
1088
1089  public int getInfoPort(ServerName serverName) {
1090    ServerMetrics serverMetrics = onlineServers.get(serverName);
1091    return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
1092  }
1093}