001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Pair;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037
038/**
039 * Class to hold dead servers list and utility querying dead server list. Servers are added when
040 * they expire or when we find them in filesystem on startup. When a server crash procedure is
041 * queued, it will populate the processing list and then remove the server from processing list when
042 * done. Servers are removed from dead server list when a new instance is started over the old on
043 * same hostname and port or when new Master comes online tidying up after all initialization.
044 * Processing list and deadserver list are not tied together (you don't have to be in deadservers
045 * list to be processing and vice versa).
046 */
047@InterfaceAudience.Private
048public class DeadServer {
049  private static final Logger LOG = LoggerFactory.getLogger(DeadServer.class);
050
051  /**
052   * Set of known dead servers. On znode expiration, servers are added here. This is needed in case
053   * of a network partitioning where the server's lease expires, but the server is still running.
054   * After the network is healed, and it's server logs are recovered, it will be told to call server
055   * startup because by then, its regions have probably been reassigned.
056   */
057  private final Map<ServerName, Long> deadServers = new HashMap<>();
058
059  /**
060   * Set of dead servers currently being processed by a SCP. Added to this list at the start of SCP
061   * and removed after it is done processing the crash.
062   */
063  private final Set<ServerName> processingServers = new HashSet<>();
064
065  /**
066   * @param serverName server name.
067   * @return true if this server is on the dead servers list false otherwise
068   */
069  public synchronized boolean isDeadServer(final ServerName serverName) {
070    return deadServers.containsKey(serverName);
071  }
072
073  /**
074   * Checks if there are currently any dead servers being processed by the master. Returns true if
075   * at least one region server is currently being processed as dead.
076   * @return true if any RS are being processed as dead
077   */
078  synchronized boolean areDeadServersInProgress() {
079    return !processingServers.isEmpty();
080  }
081
082  public synchronized Set<ServerName> copyServerNames() {
083    Set<ServerName> clone = new HashSet<>(deadServers.size());
084    clone.addAll(deadServers.keySet());
085    return clone;
086  }
087
088  /**
089   * Adds the server to the dead server list if it's not there already.
090   */
091  synchronized void putIfAbsent(ServerName sn) {
092    this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
093    processing(sn);
094  }
095
096  /**
097   * Add <code>sn<</code> to set of processing deadservers.
098   * @see #finish(ServerName)
099   */
100  public synchronized void processing(ServerName sn) {
101    if (processingServers.add(sn)) {
102      // Only log on add.
103      LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
104    }
105  }
106
107  /**
108   * Complete processing for this dead server.
109   * @param sn ServerName for the dead server.
110   * @see #processing(ServerName)
111   */
112  public synchronized void finish(ServerName sn) {
113    if (processingServers.remove(sn)) {
114      LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
115    }
116  }
117
118  public synchronized int size() {
119    return deadServers.size();
120  }
121
122  synchronized boolean isEmpty() {
123    return deadServers.isEmpty();
124  }
125
126  /**
127   * Handles restart of a server. The new server instance has a different start code. The new start
128   * code should be greater than the old one. We don't check that here. Removes the old server from
129   * deadserver list.
130   * @param newServerName Servername as either <code>host:port</code> or
131   *                      <code>host,port,startcode</code>.
132   * @return true if this server was dead before and coming back alive again
133   */
134  synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
135    Iterator<ServerName> it = deadServers.keySet().iterator();
136    while (it.hasNext()) {
137      if (cleanOldServerName(newServerName, it)) {
138        return true;
139      }
140    }
141    return false;
142  }
143
144  synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
145    Iterator<ServerName> it = deadServers.keySet().iterator();
146    while (it.hasNext()) {
147      cleanOldServerName(newServerName, it);
148    }
149  }
150
151  /**
152   * @param newServerName      Server to match port and hostname against.
153   * @param deadServerIterator Iterator primed so can call 'next' on it.
154   * @return True if <code>newServerName</code> and current primed iterator ServerName have same
155   *         host and port and we removed old server from iterator and from processing list.
156   */
157  private boolean cleanOldServerName(ServerName newServerName,
158    Iterator<ServerName> deadServerIterator) {
159    ServerName sn = deadServerIterator.next();
160    if (ServerName.isSameAddress(sn, newServerName)) {
161      // Remove from dead servers list. Don't remove from the processing list --
162      // let the SCP do it when it is done.
163      deadServerIterator.remove();
164      return true;
165    }
166    return false;
167  }
168
169  @Override
170  public synchronized String toString() {
171    // Display unified set of servers from both maps
172    Set<ServerName> servers = new HashSet<>();
173    servers.addAll(deadServers.keySet());
174    servers.addAll(processingServers);
175    StringBuilder sb = new StringBuilder();
176    for (ServerName sn : servers) {
177      if (sb.length() > 0) {
178        sb.append(", ");
179      }
180      sb.append(sn.toString());
181      // Star entries that are being processed
182      if (processingServers.contains(sn)) {
183        sb.append("*");
184      }
185    }
186    return sb.toString();
187  }
188
189  /**
190   * Extract all the servers dead since a given time, and sort them.
191   * @param ts the time, 0 for all
192   * @return a sorted array list, by death time, lowest values first.
193   */
194  synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
195    List<Pair<ServerName, Long>> res = new ArrayList<>(size());
196
197    for (Map.Entry<ServerName, Long> entry : deadServers.entrySet()) {
198      if (entry.getValue() >= ts) {
199        res.add(new Pair<>(entry.getKey(), entry.getValue()));
200      }
201    }
202
203    Collections.sort(res, (o1, o2) -> o1.getSecond().compareTo(o2.getSecond()));
204    return res;
205  }
206
207  /**
208   * Get the time when a server died
209   * @param deadServerName the dead server name
210   * @return the date when the server died
211   */
212  public synchronized Date getTimeOfDeath(final ServerName deadServerName) {
213    Long time = deadServers.get(deadServerName);
214    return time == null ? null : new Date(time);
215  }
216
217  /**
218   * Called from rpc by operator cleaning up deadserver list.
219   * @param deadServerName the dead server name
220   * @return true if this server was removed
221   */
222  public synchronized boolean removeDeadServer(final ServerName deadServerName) {
223    Preconditions.checkState(!processingServers.contains(deadServerName),
224      "Asked to remove server still in processingServers set " + deadServerName + " (numProcessing="
225        + processingServers.size() + ")");
226    return this.deadServers.remove(deadServerName) != null;
227  }
228}