001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Objects;
026import java.util.Set;
027import java.util.TreeSet;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterManager.ServiceType;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.ClusterConnection;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
044
045/**
046 * Manages the interactions with an already deployed distributed cluster (as opposed to a
047 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests.
048 */
049@InterfaceAudience.Private
050public class DistributedHBaseCluster extends HBaseCluster {
051  private Admin admin;
052  private final Connection connection;
053
054  private ClusterManager clusterManager;
055  /**
056   * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any
057   * restarted instances of the same server will have different ServerName and will not coincide
058   * with past dead ones. So there's no need to cleanup this list.
059   */
060  private final Set<ServerName> killedRegionServers = new HashSet<>();
061
062  public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager)
063    throws IOException {
064    super(conf);
065    this.clusterManager = clusterManager;
066    this.connection = ConnectionFactory.createConnection(conf);
067    this.admin = this.connection.getAdmin();
068    this.initialClusterStatus = getClusterMetrics();
069  }
070
071  public void setClusterManager(ClusterManager clusterManager) {
072    this.clusterManager = clusterManager;
073  }
074
075  public ClusterManager getClusterManager() {
076    return clusterManager;
077  }
078
079  /**
080   * Returns a ClusterStatus for this HBase cluster
081   */
082  @Override
083  public ClusterMetrics getClusterMetrics() throws IOException {
084    return admin.getClusterMetrics();
085  }
086
087  @Override
088  public ClusterMetrics getInitialClusterMetrics() throws IOException {
089    return initialClusterStatus;
090  }
091
092  @Override
093  public void close() throws IOException {
094    if (this.admin != null) {
095      admin.close();
096    }
097    if (this.connection != null && !this.connection.isClosed()) {
098      this.connection.close();
099    }
100  }
101
102  @Override
103  public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName)
104    throws IOException {
105    return ((ClusterConnection) this.connection).getAdmin(serverName);
106  }
107
108  @Override
109  public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName)
110    throws IOException {
111    return ((ClusterConnection) this.connection).getClient(serverName);
112  }
113
114  @Override
115  public void startRegionServer(String hostname, int port) throws IOException {
116    LOG.info("Starting RS on: {}", hostname);
117    clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port);
118  }
119
120  @Override
121  public void killRegionServer(ServerName serverName) throws IOException {
122    LOG.info("Aborting RS: {}", serverName.getServerName());
123    killedRegionServers.add(serverName);
124    clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
125      serverName.getPort());
126  }
127
128  @Override
129  public boolean isKilledRS(ServerName serverName) {
130    return killedRegionServers.contains(serverName);
131  }
132
133  @Override
134  public void stopRegionServer(ServerName serverName) throws IOException {
135    LOG.info("Stopping RS: {}", serverName.getServerName());
136    clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
137      serverName.getPort());
138  }
139
140  @Override
141  public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
142    waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout);
143  }
144
145  @Override
146  public void suspendRegionServer(ServerName serverName) throws IOException {
147    LOG.info("Suspend RS: {}", serverName.getServerName());
148    clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
149      serverName.getPort());
150  }
151
152  @Override
153  public void resumeRegionServer(ServerName serverName) throws IOException {
154    LOG.info("Resume RS: {}", serverName.getServerName());
155    clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(),
156      serverName.getPort());
157  }
158
159  @Override
160  public void startZkNode(String hostname, int port) throws IOException {
161    LOG.info("Starting ZooKeeper node on: {}", hostname);
162    clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port);
163  }
164
165  @Override
166  public void killZkNode(ServerName serverName) throws IOException {
167    LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName());
168    clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
169      serverName.getPort());
170  }
171
172  @Override
173  public void stopZkNode(ServerName serverName) throws IOException {
174    LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName());
175    clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(),
176      serverName.getPort());
177  }
178
179  @Override
180  public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
181    waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
182  }
183
184  @Override
185  public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
186    waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout);
187  }
188
189  @Override
190  public void startDataNode(ServerName serverName) throws IOException {
191    LOG.info("Starting data node on: {}", serverName.getServerName());
192    clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
193      serverName.getPort());
194  }
195
196  @Override
197  public void killDataNode(ServerName serverName) throws IOException {
198    LOG.info("Aborting data node on: {}", serverName.getServerName());
199    clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
200      serverName.getPort());
201  }
202
203  @Override
204  public void stopDataNode(ServerName serverName) throws IOException {
205    LOG.info("Stopping data node on: {}", serverName.getServerName());
206    clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(),
207      serverName.getPort());
208  }
209
210  @Override
211  public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
212    waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout);
213  }
214
215  @Override
216  public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
217    waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout);
218  }
219
220  @Override
221  public void startNameNode(ServerName serverName) throws IOException {
222    LOG.info("Starting name node on: {}", serverName.getServerName());
223    clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
224      serverName.getPort());
225  }
226
227  @Override
228  public void killNameNode(ServerName serverName) throws IOException {
229    LOG.info("Aborting name node on: {}", serverName.getServerName());
230    clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
231      serverName.getPort());
232  }
233
234  @Override
235  public void stopNameNode(ServerName serverName) throws IOException {
236    LOG.info("Stopping name node on: {}", serverName.getServerName());
237    clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(),
238      serverName.getPort());
239  }
240
241  @Override
242  public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException {
243    waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout);
244  }
245
246  @Override
247  public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException {
248    waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout);
249  }
250
251  @Override
252  public void startJournalNode(ServerName serverName) throws IOException {
253    LOG.info("Starting journal node on: {}", serverName.getServerName());
254    clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
255      serverName.getPort());
256  }
257
258  @Override
259  public void killJournalNode(ServerName serverName) throws IOException {
260    LOG.info("Aborting journal node on: {}", serverName.getServerName());
261    clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
262      serverName.getPort());
263  }
264
265  @Override
266  public void stopJournalNode(ServerName serverName) throws IOException {
267    LOG.info("Stopping journal node on: {}", serverName.getServerName());
268    clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(),
269      serverName.getPort());
270  }
271
272  @Override
273  public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException {
274    waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
275  }
276
277  @Override
278  public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException {
279    waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout);
280  }
281
282  private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout)
283    throws IOException {
284    LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName());
285    long start = EnvironmentEdgeManager.currentTime();
286
287    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
288      if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
289        return;
290      }
291      Threads.sleep(100);
292    }
293    throw new IOException("Timed-out waiting for service to stop: " + serverName);
294  }
295
296  private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout)
297    throws IOException {
298    LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName());
299    long start = EnvironmentEdgeManager.currentTime();
300
301    while ((EnvironmentEdgeManager.currentTime() - start) < timeout) {
302      if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) {
303        return;
304      }
305      Threads.sleep(100);
306    }
307    throw new IOException("Timed-out waiting for service to start: " + serverName);
308  }
309
310  @Override
311  public MasterService.BlockingInterface getMasterAdminService() throws IOException {
312    return ((ClusterConnection) this.connection).getMaster();
313  }
314
315  @Override
316  public void startMaster(String hostname, int port) throws IOException {
317    LOG.info("Starting Master on: {}:{}", hostname, port);
318    clusterManager.start(ServiceType.HBASE_MASTER, hostname, port);
319  }
320
321  @Override
322  public void killMaster(ServerName serverName) throws IOException {
323    LOG.info("Aborting Master: {}", serverName.getServerName());
324    clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
325  }
326
327  @Override
328  public void stopMaster(ServerName serverName) throws IOException {
329    LOG.info("Stopping Master: {}", serverName.getServerName());
330    clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort());
331  }
332
333  @Override
334  public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
335    waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout);
336  }
337
338  @Override
339  public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
340    long start = EnvironmentEdgeManager.currentTime();
341    while (EnvironmentEdgeManager.currentTime() - start < timeout) {
342      try {
343        getMasterAdminService();
344        return true;
345      } catch (MasterNotRunningException m) {
346        LOG.warn("Master not started yet " + m);
347      } catch (ZooKeeperConnectionException e) {
348        LOG.warn("Failed to connect to ZK " + e);
349      }
350      Threads.sleep(1000);
351    }
352    return false;
353  }
354
355  @Override
356  public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException {
357    byte[] startKey = RegionInfo.getStartKey(regionName);
358    HRegionLocation regionLoc = null;
359    try (RegionLocator locator = connection.getRegionLocator(tn)) {
360      regionLoc = locator.getRegionLocation(startKey, true);
361    }
362    if (regionLoc == null) {
363      LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName));
364      return null;
365    }
366    return regionLoc.getServerName();
367  }
368
369  @Override
370  public void waitUntilShutDown() {
371    // Simply wait for a few seconds for now (after issuing serverManager.kill
372    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
373  }
374
375  @Override
376  public void shutdown() throws IOException {
377    // not sure we want this
378    throw new RuntimeException(HConstants.NOT_IMPLEMENTED);
379  }
380
381  @Override
382  public boolean isDistributedCluster() {
383    return true;
384  }
385
386  @Override
387  public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException {
388    ClusterMetrics current = getClusterMetrics();
389
390    LOG.info("Restoring cluster - started");
391
392    // do a best effort restore
393    boolean success = restoreMasters(initial, current);
394    success = restoreRegionServers(initial, current) && success;
395    success = restoreAdmin() && success;
396
397    LOG.info("Restoring cluster - done");
398    return success;
399  }
400
401  protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) {
402    List<IOException> deferred = new ArrayList<>();
403    // check whether current master has changed
404    final ServerName initMaster = initial.getMasterName();
405    if (!ServerName.isSameAddress(initMaster, current.getMasterName())) {
406      LOG.info("Restoring cluster - Initial active master : {} has changed to : {}",
407        initMaster.getAddress(), current.getMasterName().getAddress());
408      // If initial master is stopped, start it, before restoring the state.
409      // It will come up as a backup master, if there is already an active master.
410      try {
411        if (
412          !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(),
413            initMaster.getPort())
414        ) {
415          LOG.info("Restoring cluster - starting initial active master at:{}",
416            initMaster.getAddress());
417          startMaster(initMaster.getHostname(), initMaster.getPort());
418        }
419
420        // master has changed, we would like to undo this.
421        // 1. Kill the current backups
422        // 2. Stop current master
423        // 3. Start backup masters
424        for (ServerName currentBackup : current.getBackupMasterNames()) {
425          if (!ServerName.isSameAddress(currentBackup, initMaster)) {
426            LOG.info("Restoring cluster - stopping backup master: {}", currentBackup);
427            stopMaster(currentBackup);
428          }
429        }
430        LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName());
431        stopMaster(current.getMasterName());
432        waitForActiveAndReadyMaster(); // wait so that active master takes over
433      } catch (IOException ex) {
434        // if we fail to start the initial active master, we do not want to continue stopping
435        // backup masters. Just keep what we have now
436        deferred.add(ex);
437      }
438
439      // start backup masters
440      for (ServerName backup : initial.getBackupMasterNames()) {
441        try {
442          // these are not started in backup mode, but we should already have an active master
443          if (
444            !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(),
445              backup.getPort())
446          ) {
447            LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress());
448            startMaster(backup.getHostname(), backup.getPort());
449          }
450        } catch (IOException ex) {
451          deferred.add(ex);
452        }
453      }
454    } else {
455      // current master has not changed, match up backup masters
456      Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
457      Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
458      toStart.addAll(initial.getBackupMasterNames());
459      toKill.addAll(current.getBackupMasterNames());
460
461      for (ServerName server : current.getBackupMasterNames()) {
462        toStart.remove(server);
463      }
464      for (ServerName server : initial.getBackupMasterNames()) {
465        toKill.remove(server);
466      }
467
468      for (ServerName sn : toStart) {
469        try {
470          if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
471            LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress());
472            startMaster(sn.getHostname(), sn.getPort());
473          }
474        } catch (IOException ex) {
475          deferred.add(ex);
476        }
477      }
478
479      for (ServerName sn : toKill) {
480        try {
481          if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) {
482            LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress());
483            stopMaster(sn);
484          }
485        } catch (IOException ex) {
486          deferred.add(ex);
487        }
488      }
489    }
490    if (!deferred.isEmpty()) {
491      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
492      for (int i = 0; i < deferred.size() && i < 3; i++) {
493        LOG.warn(Objects.toString(deferred.get(i)));
494      }
495    }
496
497    return deferred.isEmpty();
498  }
499
500  private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> {
501    @Override
502    public int compare(ServerName o1, ServerName o2) {
503      int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname());
504      if (compare != 0) return compare;
505      compare = o1.getPort() - o2.getPort();
506      if (compare != 0) return compare;
507      return 0;
508    }
509  }
510
511  protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) {
512    Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
513    Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator());
514    toStart.addAll(initial.getLiveServerMetrics().keySet());
515    toKill.addAll(current.getLiveServerMetrics().keySet());
516
517    ServerName master = initial.getMasterName();
518
519    for (ServerName server : current.getLiveServerMetrics().keySet()) {
520      toStart.remove(server);
521    }
522    for (ServerName server : initial.getLiveServerMetrics().keySet()) {
523      toKill.remove(server);
524    }
525
526    List<IOException> deferred = new ArrayList<>();
527
528    for (ServerName sn : toStart) {
529      try {
530        if (
531          !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
532            && master.getPort() != sn.getPort()
533        ) {
534          LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress());
535          startRegionServer(sn.getHostname(), sn.getPort());
536        }
537      } catch (IOException ex) {
538        deferred.add(ex);
539      }
540    }
541
542    for (ServerName sn : toKill) {
543      try {
544        if (
545          clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort())
546            && master.getPort() != sn.getPort()
547        ) {
548          LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress());
549          stopRegionServer(sn);
550        }
551      } catch (IOException ex) {
552        deferred.add(ex);
553      }
554    }
555    if (!deferred.isEmpty()) {
556      LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size());
557      for (int i = 0; i < deferred.size() && i < 3; i++) {
558        LOG.warn(Objects.toString(deferred.get(i)));
559      }
560    }
561
562    return deferred.isEmpty();
563  }
564
565  protected boolean restoreAdmin() throws IOException {
566    // While restoring above, if the HBase Master which was initially the Active one, was down
567    // and the restore put the cluster back to Initial configuration, HAdmin instance will need
568    // to refresh its connections (otherwise it will return incorrect information) or we can
569    // point it to new instance.
570    try {
571      admin.close();
572    } catch (IOException ioe) {
573      LOG.warn("While closing the old connection", ioe);
574    }
575    this.admin = this.connection.getAdmin();
576    LOG.info("Added new HBaseAdmin");
577    return true;
578  }
579}