001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.EnumSet;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Objects;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterManager.ServiceType;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Manages the interactions with an already deployed distributed cluster (as opposed to a
045 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
046 */
047@InterfaceAudience.Private
048public class DistributedHBaseCluster extends HBaseClusterInterface {
049
050  private static final Logger LOG = LoggerFactory.getLogger(DistributedHBaseCluster.class);
051
052  private Admin admin;
053  private final Connection connection;
054
055  private ClusterManager clusterManager;
056  /**
057   * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
058   * restarted instances of the same server will have different ServerName and will not coincide
059   * with past dead ones. So there's no need to cleanup this list.
060   */
061  private final Set<ServerName> killedRegionServers = new HashSet<>();
062
063  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
064    throws IOException {
065    super(conf);
066    this.clusterManager = clusterManager;
067    this.connection = ConnectionFactory.createConnection(conf);
068    this.admin = this.connection.getAdmin();
069    this.initialClusterStatus = getClusterMetrics();
070  }
071
072  public void setClusterManager(ClusterManager clusterManager) {
073    this.clusterManager = clusterManager;
074  }
075
076  public ClusterManager getClusterManager() {
077    return clusterManager;
078  }
079
080  /**
081   * Returns a ClusterStatus for this HBase cluster
082   */
083  @Override
084  public ClusterMetrics getClusterMetrics() throws IOException {
085    return admin.getClusterMetrics();
086  }
087
088  @Override
089  public ClusterMetrics getInitialClusterMetrics() throws IOException {
090    return initialClusterStatus;
091  }
092
093  @Override
094  public void close() throws IOException {
095    if (this.admin != null) {
096      admin.close();
097    }
098    if (this.connection != null && !this.connection.isClosed()) {
099      this.connection.close();
100    }
101  }
102
103  @Override
104  public void startRegionServer(String hostname, int port) throws IOException {
105    LOG.info("Starting RS on: {}", hostname);
106    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
107  }
108
109  @Override
110  public void killRegionServer(ServerName serverName) throws IOException {
111    LOG.info("Aborting RS: {}", serverName.getServerName());
112    killedRegionServers.add(serverName);
113    clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
114      serverName.getPort());
115  }
116
117  @Override
118  public boolean isKilledRS(ServerName serverName) {
119    return killedRegionServers.contains(serverName);
120  }
121
122  @Override
123  public void stopRegionServer(ServerName serverName) throws IOException {
124    LOG.info("Stopping RS: {}", serverName.getServerName());
125    clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
126      serverName.getPort());
127  }
128
129  @Override
130  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
131    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
132  }
133
134  @Override
135  public void waitForRegionServerToSuspend(ServerName serverName, long timeout) throws IOException {
136    waitForServiceToSuspend(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
137  }
138
139  @Override
140  public void waitForRegionServerToResume(ServerName serverName, long timeout) throws IOException {
141    waitForServiceToResume(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
142  }
143
144  @Override
145  public void suspendRegionServer(ServerName serverName) throws IOException {
146    LOG.info("Suspend RS: {}", serverName.getServerName());
147    clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
148      serverName.getPort());
149  }
150
151  @Override
152  public void resumeRegionServer(ServerName serverName) throws IOException {
153    LOG.info("Resume RS: {}", serverName.getServerName());
154    clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
155      serverName.getPort());
156  }
157
158  @Override
159  public void startZkNode(String hostname, int port) throws IOException {
160    LOG.info("Starting ZooKeeper node on: {}", hostname);
161    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
162  }
163
164  @Override
165  public void killZkNode(ServerName serverName) throws IOException {
166    LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName());
167    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
168      serverName.getPort());
169  }
170
171  @Override
172  public void stopZkNode(ServerName serverName) throws IOException {
173    LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName());
174    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
175      serverName.getPort());
176  }
177
178  @Override
179  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
180    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
181  }
182
183  @Override
184  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
185    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
186  }
187
188  @Override
189  public void startDataNode(ServerName serverName) throws IOException {
190    LOG.info("Starting data node on: {}", serverName.getServerName());
191    clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
192      serverName.getPort());
193  }
194
195  @Override
196  public void killDataNode(ServerName serverName) throws IOException {
197    LOG.info("Aborting data node on: {}", serverName.getServerName());
198    clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
199      serverName.getPort());
200  }
201
202  @Override
203  public void stopDataNode(ServerName serverName) throws IOException {
204    LOG.info("Stopping data node on: {}", serverName.getServerName());
205    clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
206      serverName.getPort());
207  }
208
209  @Override
210  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
211    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
212  }
213
214  @Override
215  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
216    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
217  }
218
219  @Override
220  public void startNameNode(ServerName serverName) throws IOException {
221    LOG.info("Starting name node on: {}", serverName.getServerName());
222    clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
223      serverName.getPort());
224  }
225
226  @Override
227  public void killNameNode(ServerName serverName) throws IOException {
228    LOG.info("Aborting name node on: {}", serverName.getServerName());
229    clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
230      serverName.getPort());
231  }
232
233  @Override
234  public void stopNameNode(ServerName serverName) throws IOException {
235    LOG.info("Stopping name node on: {}", serverName.getServerName());
236    clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
237      serverName.getPort());
238  }
239
240  @Override
241  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
242    waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout);
243  }
244
245  @Override
246  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
247    waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
248  }
249
250  @Override
251  public void startJournalNode(ServerName serverName) throws IOException {
252    LOG.info("Starting journal node on: {}", serverName.getServerName());
253    clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
254      serverName.getPort());
255  }
256
257  @Override
258  public void killJournalNode(ServerName serverName) throws IOException {
259    LOG.info("Aborting journal node on: {}", serverName.getServerName());
260    clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
261      serverName.getPort());
262  }
263
264  @Override
265  public void stopJournalNode(ServerName serverName) throws IOException {
266    LOG.info("Stopping journal node on: {}", serverName.getServerName());
267    clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
268      serverName.getPort());
269  }
270
271  @Override
272  public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException {
273    waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
274  }
275
276  @Override
277  public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException {
278    waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
279  }
280
281  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
282    throws IOException {
283    LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
284    long start = EnvironmentEdgeManager.currentTime();
285
286    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
287      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
288        return;
289      }
290      Threads.sleep(100);
291    }
292    throw new IOException("Timed-out waiting for service to stop: " + serverName);
293  }
294
295  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
296    throws IOException {
297    LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName());
298    long start = EnvironmentEdgeManager.currentTime();
299
300    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
301      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
302        return;
303      }
304      Threads.sleep(100);
305    }
306    throw new IOException("Timed-out waiting for service to start: " + serverName);
307  }
308
309  private void waitForServiceToSuspend(ServiceType service, ServerName serverName, long timeout)
310    throws IOException {
311    LOG.info("Waiting for service: {} to suspend: {}", service, serverName.getServerName());
312    long start = System.currentTimeMillis();
313
314    while ((System.currentTimeMillis() - start) < timeout) {
315      if (clusterManager.isSuspended(service, serverName.getHostname(), serverName.getPort())) {
316        return;
317      }
318      Threads.sleep(100);
319    }
320    throw new IOException("Timed-out waiting for service to suspend: " + serverName);
321  }
322
323  private void waitForServiceToResume(ServiceType service, ServerName serverName, long timeout)
324    throws IOException {
325    LOG.info("Waiting for service: {} to resume: {}", service, serverName.getServerName());
326    long start = System.currentTimeMillis();
327
328    while ((System.currentTimeMillis() - start) < timeout) {
329      if (clusterManager.isResumed(service, serverName.getHostname(), serverName.getPort())) {
330        return;
331      }
332      Threads.sleep(100);
333    }
334    throw new IOException("Timed-out waiting for service to resume: " + serverName);
335  }
336
337  @Override
338  public void startMaster(String hostname, int port) throws IOException {
339    LOG.info("Starting Master on: {}:{}", hostname, port);
340    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
341  }
342
343  @Override
344  public void killMaster(ServerName serverName) throws IOException {
345    LOG.info("Aborting Master: {}", serverName.getServerName());
346    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
347  }
348
349  @Override
350  public void stopMaster(ServerName serverName) throws IOException {
351    LOG.info("Stopping Master: {}", serverName.getServerName());
352    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
353  }
354
355  @Override
356  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
357    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
358  }
359
360  @Override
361  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
362    long start = EnvironmentEdgeManager.currentTime();
363    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
364      try {
365        connection.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.HBASE_VERSION));
366        return true;
367      } catch (MasterNotRunningException m) {
368        LOG.warn("Master not started yet " + m);
369      } catch (ZooKeeperConnectionException e) {
370        LOG.warn("Failed to connect to ZK " + e);
371      }
372      Threads.sleep(1000);
373    }
374    return false;
375  }
376
377  @Override
378  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
379    byte[] startKey = RegionInfo.getStartKey(regionName);
380    HRegionLocation regionLoc = null;
381    try (RegionLocator locator = connection.getRegionLocator(tn)) {
382      regionLoc = locator.getRegionLocation(startKey, true);
383    }
384    if (regionLoc == null) {
385      LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName));
386      return null;
387    }
388    return regionLoc.getServerName();
389  }
390
391  @Override
392  public void waitUntilShutDown() {
393    // Simply wait for a few seconds for now (after issuing serverManager.kill
394    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
395  }
396
397  @Override
398  public void shutdown() throws IOException {
399    // not sure we want this
400    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
401  }
402
403  @Override
404  public boolean isDistributedCluster() {
405    return true;
406  }
407
408  @Override
409  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
410    ClusterMetrics current = getClusterMetrics();
411
412    LOG.info("Restoring cluster - started");
413
414    // do a best effort restore
415    boolean success = restoreMasters(initial, current);
416    success = restoreRegionServers(initial, current) && success;
417    success = restoreAdmin() && success;
418
419    LOG.info("Restoring cluster - done");
420    return success;
421  }
422
423  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
424    List<IOException> deferred = new ArrayList<>();
425    // check whether current master has changed
426    final ServerName initMaster = initial.getMasterName();
427    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
428      LOG.info("Restoring cluster - Initial active master : {} has changed to : {}",
429        initMaster.getAddress(), current.getMasterName().getAddress());
430      // If initial master is stopped, start it, before restoring the state.
431      // It will come up as a backup master, if there is already an active master.
432      try {
433        if (
434          !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(),
435            initMaster.getPort())
436        ) {
437          LOG.info("Restoring cluster - starting initial active master at:{}",
438            initMaster.getAddress());
439          startMaster(initMaster.getHostname(), initMaster.getPort());
440        }
441
442        // master has changed, we would like to undo this.
443        // 1. Kill the current backups
444        // 2. Stop current master
445        // 3. Start backup masters
446        for (ServerName currentBackup : current.getBackupMasterNames()) {
447          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
448            LOG.info("Restoring cluster - stopping backup master: {}", currentBackup);
449            stopMaster(currentBackup);
450          }
451        }
452        LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName());
453        stopMaster(current.getMasterName());
454        waitForActiveAndReadyMaster(); // wait so that active master takes over
455      } catch (IOException ex) {
456        // if we fail to start the initial active master, we do not want to continue stopping
457        // backup masters. Just keep what we have now
458        deferred.add(ex);
459      }
460
461      // start backup masters
462      for (ServerName backup : initial.getBackupMasterNames()) {
463        try {
464          // these are not started in backup mode, but we should already have an active master
465          if (
466            !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(),
467              backup.getPort())
468          ) {
469            LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress());
470            startMaster(backup.getHostname(), backup.getPort());
471          }
472        } catch (IOException ex) {
473          deferred.add(ex);
474        }
475      }
476    } else {
477      // current master has not changed, match up backup masters
478      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
479      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
480      toStart.addAll(initial.getBackupMasterNames());
481      toKill.addAll(current.getBackupMasterNames());
482
483      for (ServerName server : current.getBackupMasterNames()) {
484        toStart.remove(server);
485      }
486      for (ServerName server : initial.getBackupMasterNames()) {
487        toKill.remove(server);
488      }
489
490      for (ServerName sn : toStart) {
491        try {
492          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
493            LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress());
494            startMaster(sn.getHostname(), sn.getPort());
495          }
496        } catch (IOException ex) {
497          deferred.add(ex);
498        }
499      }
500
501      for (ServerName sn : toKill) {
502        try {
503          if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
504            LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress());
505            stopMaster(sn);
506          }
507        } catch (IOException ex) {
508          deferred.add(ex);
509        }
510      }
511    }
512    if (!deferred.isEmpty()) {
513      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
514      for (int i = 0; i < deferred.size() && i < 3; i++) {
515        LOG.warn(Objects.toString(deferred.get(i)));
516      }
517    }
518
519    return deferred.isEmpty();
520  }
521
522  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
523    @Override
524    public int compare(ServerName o1, ServerName o2) {
525      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
526      if (compare != 0) return compare;
527      compare = o1.getPort() - o2.getPort();
528      if (compare != 0) return compare;
529      return 0;
530    }
531  }
532
533  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
534    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
535    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
536    toStart.addAll(initial.getLiveServerMetrics().keySet());
537    toKill.addAll(current.getLiveServerMetrics().keySet());
538
539    ServerName master = initial.getMasterName();
540
541    for (ServerName server : current.getLiveServerMetrics().keySet()) {
542      toStart.remove(server);
543    }
544    for (ServerName server : initial.getLiveServerMetrics().keySet()) {
545      toKill.remove(server);
546    }
547
548    List<IOException> deferred = new ArrayList<>();
549
550    for (ServerName sn : toStart) {
551      try {
552        if (
553          !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
554            && master.getPort() != sn.getPort()
555        ) {
556          LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress());
557          startRegionServer(sn.getHostname(), sn.getPort());
558        }
559      } catch (IOException ex) {
560        deferred.add(ex);
561      }
562    }
563
564    for (ServerName sn : toKill) {
565      try {
566        if (
567          clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
568            && master.getPort() != sn.getPort()
569        ) {
570          LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress());
571          stopRegionServer(sn);
572        }
573      } catch (IOException ex) {
574        deferred.add(ex);
575      }
576    }
577    if (!deferred.isEmpty()) {
578      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
579      for (int i = 0; i < deferred.size() && i < 3; i++) {
580        LOG.warn(Objects.toString(deferred.get(i)));
581      }
582    }
583
584    return deferred.isEmpty();
585  }
586
587  protected boolean restoreAdmin() throws IOException {
588    // While restoring above, if the HBase Master which was initially the Active one, was down
589    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
590    // to refresh its connections (otherwise it will return incorrect information) or we can
591    // point it to new instance.
592    admin.close();
593    this.admin = this.connection.getAdmin();
594    LOG.info("Added new HBaseAdmin");
595    return true;
596  }
597}