001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.EnumSet; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Objects; 027import java.util.Set; 028import java.util.TreeSet; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ClusterManager.ServiceType; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Manages the interactions with an already deployed distributed cluster (as opposed to a 045 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests. 046 */ 047@InterfaceAudience.Private 048public class DistributedHBaseCluster extends HBaseClusterInterface { 049 050 private static final Logger LOG = LoggerFactory.getLogger(DistributedHBaseCluster.class); 051 052 private Admin admin; 053 private final Connection connection; 054 055 private ClusterManager clusterManager; 056 /** 057 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 058 * restarted instances of the same server will have different ServerName and will not coincide 059 * with past dead ones. So there's no need to cleanup this list. 060 */ 061 private final Set<ServerName> killedRegionServers = new HashSet<>(); 062 063 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) 064 throws IOException { 065 super(conf); 066 this.clusterManager = clusterManager; 067 this.connection = ConnectionFactory.createConnection(conf); 068 this.admin = this.connection.getAdmin(); 069 this.initialClusterStatus = getClusterMetrics(); 070 } 071 072 public void setClusterManager(ClusterManager clusterManager) { 073 this.clusterManager = clusterManager; 074 } 075 076 public ClusterManager getClusterManager() { 077 return clusterManager; 078 } 079 080 /** 081 * Returns a ClusterStatus for this HBase cluster 082 */ 083 @Override 084 public ClusterMetrics getClusterMetrics() throws IOException { 085 return admin.getClusterMetrics(); 086 } 087 088 @Override 089 public ClusterMetrics getInitialClusterMetrics() throws IOException { 090 return initialClusterStatus; 091 } 092 093 @Override 094 public void close() throws IOException { 095 if (this.admin != null) { 096 admin.close(); 097 } 098 if (this.connection != null && !this.connection.isClosed()) { 099 this.connection.close(); 100 } 101 } 102 103 @Override 104 public void startRegionServer(String hostname, int port) throws IOException { 105 LOG.info("Starting RS on: {}", hostname); 106 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); 107 } 108 109 @Override 110 public void killRegionServer(ServerName serverName) throws IOException { 111 LOG.info("Aborting RS: {}", serverName.getServerName()); 112 killedRegionServers.add(serverName); 113 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 114 serverName.getPort()); 115 } 116 117 @Override 118 public boolean isKilledRS(ServerName serverName) { 119 return killedRegionServers.contains(serverName); 120 } 121 122 @Override 123 public void stopRegionServer(ServerName serverName) throws IOException { 124 LOG.info("Stopping RS: {}", serverName.getServerName()); 125 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 126 serverName.getPort()); 127 } 128 129 @Override 130 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 131 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 132 } 133 134 @Override 135 public void suspendRegionServer(ServerName serverName) throws IOException { 136 LOG.info("Suspend RS: {}", serverName.getServerName()); 137 clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 138 serverName.getPort()); 139 } 140 141 @Override 142 public void resumeRegionServer(ServerName serverName) throws IOException { 143 LOG.info("Resume RS: {}", serverName.getServerName()); 144 clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 145 serverName.getPort()); 146 } 147 148 @Override 149 public void startZkNode(String hostname, int port) throws IOException { 150 LOG.info("Starting ZooKeeper node on: {}", hostname); 151 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port); 152 } 153 154 @Override 155 public void killZkNode(ServerName serverName) throws IOException { 156 LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName()); 157 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 158 serverName.getPort()); 159 } 160 161 @Override 162 public void stopZkNode(ServerName serverName) throws IOException { 163 LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName()); 164 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 165 serverName.getPort()); 166 } 167 168 @Override 169 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 170 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 171 } 172 173 @Override 174 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 175 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 176 } 177 178 @Override 179 public void startDataNode(ServerName serverName) throws IOException { 180 LOG.info("Starting data node on: {}", serverName.getServerName()); 181 clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 182 serverName.getPort()); 183 } 184 185 @Override 186 public void killDataNode(ServerName serverName) throws IOException { 187 LOG.info("Aborting data node on: {}", serverName.getServerName()); 188 clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 189 serverName.getPort()); 190 } 191 192 @Override 193 public void stopDataNode(ServerName serverName) throws IOException { 194 LOG.info("Stopping data node on: {}", serverName.getServerName()); 195 clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 196 serverName.getPort()); 197 } 198 199 @Override 200 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 201 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout); 202 } 203 204 @Override 205 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 206 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout); 207 } 208 209 @Override 210 public void startNameNode(ServerName serverName) throws IOException { 211 LOG.info("Starting name node on: {}", serverName.getServerName()); 212 clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 213 serverName.getPort()); 214 } 215 216 @Override 217 public void killNameNode(ServerName serverName) throws IOException { 218 LOG.info("Aborting name node on: {}", serverName.getServerName()); 219 clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 220 serverName.getPort()); 221 } 222 223 @Override 224 public void stopNameNode(ServerName serverName) throws IOException { 225 LOG.info("Stopping name node on: {}", serverName.getServerName()); 226 clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 227 serverName.getPort()); 228 } 229 230 @Override 231 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 232 waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout); 233 } 234 235 @Override 236 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 237 waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout); 238 } 239 240 @Override 241 public void startJournalNode(ServerName serverName) throws IOException { 242 LOG.info("Starting journal node on: {}", serverName.getServerName()); 243 clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 244 serverName.getPort()); 245 } 246 247 @Override 248 public void killJournalNode(ServerName serverName) throws IOException { 249 LOG.info("Aborting journal node on: {}", serverName.getServerName()); 250 clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 251 serverName.getPort()); 252 } 253 254 @Override 255 public void stopJournalNode(ServerName serverName) throws IOException { 256 LOG.info("Stopping journal node on: {}", serverName.getServerName()); 257 clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 258 serverName.getPort()); 259 } 260 261 @Override 262 public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException { 263 waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout); 264 } 265 266 @Override 267 public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException { 268 waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout); 269 } 270 271 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) 272 throws IOException { 273 LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName()); 274 long start = EnvironmentEdgeManager.currentTime(); 275 276 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 277 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 278 return; 279 } 280 Threads.sleep(100); 281 } 282 throw new IOException("Timed-out waiting for service to stop: " + serverName); 283 } 284 285 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout) 286 throws IOException { 287 LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName()); 288 long start = EnvironmentEdgeManager.currentTime(); 289 290 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 291 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 292 return; 293 } 294 Threads.sleep(100); 295 } 296 throw new IOException("Timed-out waiting for service to start: " + serverName); 297 } 298 299 @Override 300 public void startMaster(String hostname, int port) throws IOException { 301 LOG.info("Starting Master on: {}:{}", hostname, port); 302 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); 303 } 304 305 @Override 306 public void killMaster(ServerName serverName) throws IOException { 307 LOG.info("Aborting Master: {}", serverName.getServerName()); 308 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 309 } 310 311 @Override 312 public void stopMaster(ServerName serverName) throws IOException { 313 LOG.info("Stopping Master: {}", serverName.getServerName()); 314 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 315 } 316 317 @Override 318 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 319 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); 320 } 321 322 @Override 323 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 324 long start = EnvironmentEdgeManager.currentTime(); 325 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 326 try { 327 connection.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.HBASE_VERSION)); 328 return true; 329 } catch (MasterNotRunningException m) { 330 LOG.warn("Master not started yet " + m); 331 } catch (ZooKeeperConnectionException e) { 332 LOG.warn("Failed to connect to ZK " + e); 333 } 334 Threads.sleep(1000); 335 } 336 return false; 337 } 338 339 @Override 340 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { 341 byte[] startKey = RegionInfo.getStartKey(regionName); 342 HRegionLocation regionLoc = null; 343 try (RegionLocator locator = connection.getRegionLocator(tn)) { 344 regionLoc = locator.getRegionLocation(startKey, true); 345 } 346 if (regionLoc == null) { 347 LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName)); 348 return null; 349 } 350 return regionLoc.getServerName(); 351 } 352 353 @Override 354 public void waitUntilShutDown() { 355 // Simply wait for a few seconds for now (after issuing serverManager.kill 356 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 357 } 358 359 @Override 360 public void shutdown() throws IOException { 361 // not sure we want this 362 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 363 } 364 365 @Override 366 public boolean isDistributedCluster() { 367 return true; 368 } 369 370 @Override 371 public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException { 372 ClusterMetrics current = getClusterMetrics(); 373 374 LOG.info("Restoring cluster - started"); 375 376 // do a best effort restore 377 boolean success = restoreMasters(initial, current); 378 success = restoreRegionServers(initial, current) && success; 379 success = restoreAdmin() && success; 380 381 LOG.info("Restoring cluster - done"); 382 return success; 383 } 384 385 protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) { 386 List<IOException> deferred = new ArrayList<>(); 387 // check whether current master has changed 388 final ServerName initMaster = initial.getMasterName(); 389 if (!ServerName.isSameAddress(initMaster, current.getMasterName())) { 390 LOG.info("Restoring cluster - Initial active master : {} has changed to : {}", 391 initMaster.getAddress(), current.getMasterName().getAddress()); 392 // If initial master is stopped, start it, before restoring the state. 393 // It will come up as a backup master, if there is already an active master. 394 try { 395 if ( 396 !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(), 397 initMaster.getPort()) 398 ) { 399 LOG.info("Restoring cluster - starting initial active master at:{}", 400 initMaster.getAddress()); 401 startMaster(initMaster.getHostname(), initMaster.getPort()); 402 } 403 404 // master has changed, we would like to undo this. 405 // 1. Kill the current backups 406 // 2. Stop current master 407 // 3. Start backup masters 408 for (ServerName currentBackup : current.getBackupMasterNames()) { 409 if (!ServerName.isSameAddress(currentBackup, initMaster)) { 410 LOG.info("Restoring cluster - stopping backup master: {}", currentBackup); 411 stopMaster(currentBackup); 412 } 413 } 414 LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName()); 415 stopMaster(current.getMasterName()); 416 waitForActiveAndReadyMaster(); // wait so that active master takes over 417 } catch (IOException ex) { 418 // if we fail to start the initial active master, we do not want to continue stopping 419 // backup masters. Just keep what we have now 420 deferred.add(ex); 421 } 422 423 // start backup masters 424 for (ServerName backup : initial.getBackupMasterNames()) { 425 try { 426 // these are not started in backup mode, but we should already have an active master 427 if ( 428 !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(), 429 backup.getPort()) 430 ) { 431 LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress()); 432 startMaster(backup.getHostname(), backup.getPort()); 433 } 434 } catch (IOException ex) { 435 deferred.add(ex); 436 } 437 } 438 } else { 439 // current master has not changed, match up backup masters 440 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 441 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 442 toStart.addAll(initial.getBackupMasterNames()); 443 toKill.addAll(current.getBackupMasterNames()); 444 445 for (ServerName server : current.getBackupMasterNames()) { 446 toStart.remove(server); 447 } 448 for (ServerName server : initial.getBackupMasterNames()) { 449 toKill.remove(server); 450 } 451 452 for (ServerName sn : toStart) { 453 try { 454 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 455 LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress()); 456 startMaster(sn.getHostname(), sn.getPort()); 457 } 458 } catch (IOException ex) { 459 deferred.add(ex); 460 } 461 } 462 463 for (ServerName sn : toKill) { 464 try { 465 if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 466 LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress()); 467 stopMaster(sn); 468 } 469 } catch (IOException ex) { 470 deferred.add(ex); 471 } 472 } 473 } 474 if (!deferred.isEmpty()) { 475 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 476 for (int i = 0; i < deferred.size() && i < 3; i++) { 477 LOG.warn(Objects.toString(deferred.get(i))); 478 } 479 } 480 481 return deferred.isEmpty(); 482 } 483 484 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> { 485 @Override 486 public int compare(ServerName o1, ServerName o2) { 487 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); 488 if (compare != 0) return compare; 489 compare = o1.getPort() - o2.getPort(); 490 if (compare != 0) return compare; 491 return 0; 492 } 493 } 494 495 protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) { 496 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 497 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 498 toStart.addAll(initial.getLiveServerMetrics().keySet()); 499 toKill.addAll(current.getLiveServerMetrics().keySet()); 500 501 ServerName master = initial.getMasterName(); 502 503 for (ServerName server : current.getLiveServerMetrics().keySet()) { 504 toStart.remove(server); 505 } 506 for (ServerName server : initial.getLiveServerMetrics().keySet()) { 507 toKill.remove(server); 508 } 509 510 List<IOException> deferred = new ArrayList<>(); 511 512 for (ServerName sn : toStart) { 513 try { 514 if ( 515 !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 516 && master.getPort() != sn.getPort() 517 ) { 518 LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress()); 519 startRegionServer(sn.getHostname(), sn.getPort()); 520 } 521 } catch (IOException ex) { 522 deferred.add(ex); 523 } 524 } 525 526 for (ServerName sn : toKill) { 527 try { 528 if ( 529 clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 530 && master.getPort() != sn.getPort() 531 ) { 532 LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress()); 533 stopRegionServer(sn); 534 } 535 } catch (IOException ex) { 536 deferred.add(ex); 537 } 538 } 539 if (!deferred.isEmpty()) { 540 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 541 for (int i = 0; i < deferred.size() && i < 3; i++) { 542 LOG.warn(Objects.toString(deferred.get(i))); 543 } 544 } 545 546 return deferred.isEmpty(); 547 } 548 549 protected boolean restoreAdmin() throws IOException { 550 // While restoring above, if the HBase Master which was initially the Active one, was down 551 // and the restore put the cluster back to Initial configuration, HAdmin instance will need 552 // to refresh its connections (otherwise it will return incorrect information) or we can 553 // point it to new instance. 554 admin.close(); 555 this.admin = this.connection.getAdmin(); 556 LOG.info("Added new HBaseAdmin"); 557 return true; 558 } 559}