001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.EnumSet; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Objects; 027import java.util.Set; 028import java.util.TreeSet; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ClusterManager.ServiceType; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Manages the interactions with an already deployed distributed cluster (as opposed to a 045 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests. 046 */ 047@InterfaceAudience.Private 048public class DistributedHBaseCluster extends HBaseClusterInterface { 049 050 private static final Logger LOG = LoggerFactory.getLogger(DistributedHBaseCluster.class); 051 052 private Admin admin; 053 private final Connection connection; 054 055 private ClusterManager clusterManager; 056 /** 057 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 058 * restarted instances of the same server will have different ServerName and will not coincide 059 * with past dead ones. So there's no need to cleanup this list. 060 */ 061 private final Set<ServerName> killedRegionServers = new HashSet<>(); 062 063 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) 064 throws IOException { 065 super(conf); 066 this.clusterManager = clusterManager; 067 this.connection = ConnectionFactory.createConnection(conf); 068 this.admin = this.connection.getAdmin(); 069 this.initialClusterStatus = getClusterMetrics(); 070 } 071 072 public void setClusterManager(ClusterManager clusterManager) { 073 this.clusterManager = clusterManager; 074 } 075 076 public ClusterManager getClusterManager() { 077 return clusterManager; 078 } 079 080 /** 081 * Returns a ClusterStatus for this HBase cluster 082 */ 083 @Override 084 public ClusterMetrics getClusterMetrics() throws IOException { 085 return admin.getClusterMetrics(); 086 } 087 088 @Override 089 public ClusterMetrics getInitialClusterMetrics() throws IOException { 090 return initialClusterStatus; 091 } 092 093 @Override 094 public void close() throws IOException { 095 if (this.admin != null) { 096 admin.close(); 097 } 098 if (this.connection != null && !this.connection.isClosed()) { 099 this.connection.close(); 100 } 101 } 102 103 @Override 104 public void startRegionServer(String hostname, int port) throws IOException { 105 LOG.info("Starting RS on: {}", hostname); 106 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); 107 } 108 109 @Override 110 public void killRegionServer(ServerName serverName) throws IOException { 111 LOG.info("Aborting RS: {}", serverName.getServerName()); 112 killedRegionServers.add(serverName); 113 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 114 serverName.getPort()); 115 } 116 117 @Override 118 public boolean isKilledRS(ServerName serverName) { 119 return killedRegionServers.contains(serverName); 120 } 121 122 @Override 123 public void stopRegionServer(ServerName serverName) throws IOException { 124 LOG.info("Stopping RS: {}", serverName.getServerName()); 125 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 126 serverName.getPort()); 127 } 128 129 @Override 130 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 131 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 132 } 133 134 @Override 135 public void waitForRegionServerToSuspend(ServerName serverName, long timeout) throws IOException { 136 waitForServiceToSuspend(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 137 } 138 139 @Override 140 public void waitForRegionServerToResume(ServerName serverName, long timeout) throws IOException { 141 waitForServiceToResume(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 142 } 143 144 @Override 145 public void suspendRegionServer(ServerName serverName) throws IOException { 146 LOG.info("Suspend RS: {}", serverName.getServerName()); 147 clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 148 serverName.getPort()); 149 } 150 151 @Override 152 public void resumeRegionServer(ServerName serverName) throws IOException { 153 LOG.info("Resume RS: {}", serverName.getServerName()); 154 clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 155 serverName.getPort()); 156 } 157 158 @Override 159 public void startZkNode(String hostname, int port) throws IOException { 160 LOG.info("Starting ZooKeeper node on: {}", hostname); 161 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port); 162 } 163 164 @Override 165 public void killZkNode(ServerName serverName) throws IOException { 166 LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName()); 167 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 168 serverName.getPort()); 169 } 170 171 @Override 172 public void stopZkNode(ServerName serverName) throws IOException { 173 LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName()); 174 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 175 serverName.getPort()); 176 } 177 178 @Override 179 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 180 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 181 } 182 183 @Override 184 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 185 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 186 } 187 188 @Override 189 public void startDataNode(ServerName serverName) throws IOException { 190 LOG.info("Starting data node on: {}", serverName.getServerName()); 191 clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 192 serverName.getPort()); 193 } 194 195 @Override 196 public void killDataNode(ServerName serverName) throws IOException { 197 LOG.info("Aborting data node on: {}", serverName.getServerName()); 198 clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 199 serverName.getPort()); 200 } 201 202 @Override 203 public void stopDataNode(ServerName serverName) throws IOException { 204 LOG.info("Stopping data node on: {}", serverName.getServerName()); 205 clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 206 serverName.getPort()); 207 } 208 209 @Override 210 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 211 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout); 212 } 213 214 @Override 215 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 216 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout); 217 } 218 219 @Override 220 public void startNameNode(ServerName serverName) throws IOException { 221 LOG.info("Starting name node on: {}", serverName.getServerName()); 222 clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 223 serverName.getPort()); 224 } 225 226 @Override 227 public void killNameNode(ServerName serverName) throws IOException { 228 LOG.info("Aborting name node on: {}", serverName.getServerName()); 229 clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 230 serverName.getPort()); 231 } 232 233 @Override 234 public void stopNameNode(ServerName serverName) throws IOException { 235 LOG.info("Stopping name node on: {}", serverName.getServerName()); 236 clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 237 serverName.getPort()); 238 } 239 240 @Override 241 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 242 waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout); 243 } 244 245 @Override 246 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 247 waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout); 248 } 249 250 @Override 251 public void startJournalNode(ServerName serverName) throws IOException { 252 LOG.info("Starting journal node on: {}", serverName.getServerName()); 253 clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 254 serverName.getPort()); 255 } 256 257 @Override 258 public void killJournalNode(ServerName serverName) throws IOException { 259 LOG.info("Aborting journal node on: {}", serverName.getServerName()); 260 clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 261 serverName.getPort()); 262 } 263 264 @Override 265 public void stopJournalNode(ServerName serverName) throws IOException { 266 LOG.info("Stopping journal node on: {}", serverName.getServerName()); 267 clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 268 serverName.getPort()); 269 } 270 271 @Override 272 public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException { 273 waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout); 274 } 275 276 @Override 277 public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException { 278 waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout); 279 } 280 281 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) 282 throws IOException { 283 LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName()); 284 long start = EnvironmentEdgeManager.currentTime(); 285 286 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 287 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 288 return; 289 } 290 Threads.sleep(100); 291 } 292 throw new IOException("Timed-out waiting for service to stop: " + serverName); 293 } 294 295 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout) 296 throws IOException { 297 LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName()); 298 long start = EnvironmentEdgeManager.currentTime(); 299 300 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 301 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 302 return; 303 } 304 Threads.sleep(100); 305 } 306 throw new IOException("Timed-out waiting for service to start: " + serverName); 307 } 308 309 private void waitForServiceToSuspend(ServiceType service, ServerName serverName, long timeout) 310 throws IOException { 311 LOG.info("Waiting for service: {} to suspend: {}", service, serverName.getServerName()); 312 long start = System.currentTimeMillis(); 313 314 while ((System.currentTimeMillis() - start) < timeout) { 315 if (clusterManager.isSuspended(service, serverName.getHostname(), serverName.getPort())) { 316 return; 317 } 318 Threads.sleep(100); 319 } 320 throw new IOException("Timed-out waiting for service to suspend: " + serverName); 321 } 322 323 private void waitForServiceToResume(ServiceType service, ServerName serverName, long timeout) 324 throws IOException { 325 LOG.info("Waiting for service: {} to resume: {}", service, serverName.getServerName()); 326 long start = System.currentTimeMillis(); 327 328 while ((System.currentTimeMillis() - start) < timeout) { 329 if (clusterManager.isResumed(service, serverName.getHostname(), serverName.getPort())) { 330 return; 331 } 332 Threads.sleep(100); 333 } 334 throw new IOException("Timed-out waiting for service to resume: " + serverName); 335 } 336 337 @Override 338 public void startMaster(String hostname, int port) throws IOException { 339 LOG.info("Starting Master on: {}:{}", hostname, port); 340 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); 341 } 342 343 @Override 344 public void killMaster(ServerName serverName) throws IOException { 345 LOG.info("Aborting Master: {}", serverName.getServerName()); 346 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 347 } 348 349 @Override 350 public void stopMaster(ServerName serverName) throws IOException { 351 LOG.info("Stopping Master: {}", serverName.getServerName()); 352 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 353 } 354 355 @Override 356 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 357 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); 358 } 359 360 @Override 361 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 362 long start = EnvironmentEdgeManager.currentTime(); 363 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 364 try { 365 connection.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.HBASE_VERSION)); 366 return true; 367 } catch (MasterNotRunningException m) { 368 LOG.warn("Master not started yet " + m); 369 } catch (ZooKeeperConnectionException e) { 370 LOG.warn("Failed to connect to ZK " + e); 371 } 372 Threads.sleep(1000); 373 } 374 return false; 375 } 376 377 @Override 378 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { 379 byte[] startKey = RegionInfo.getStartKey(regionName); 380 HRegionLocation regionLoc = null; 381 try (RegionLocator locator = connection.getRegionLocator(tn)) { 382 regionLoc = locator.getRegionLocation(startKey, true); 383 } 384 if (regionLoc == null) { 385 LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName)); 386 return null; 387 } 388 return regionLoc.getServerName(); 389 } 390 391 @Override 392 public void waitUntilShutDown() { 393 // Simply wait for a few seconds for now (after issuing serverManager.kill 394 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 395 } 396 397 @Override 398 public void shutdown() throws IOException { 399 // not sure we want this 400 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 401 } 402 403 @Override 404 public boolean isDistributedCluster() { 405 return true; 406 } 407 408 @Override 409 public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException { 410 ClusterMetrics current = getClusterMetrics(); 411 412 LOG.info("Restoring cluster - started"); 413 414 // do a best effort restore 415 boolean success = restoreMasters(initial, current); 416 success = restoreRegionServers(initial, current) && success; 417 success = restoreAdmin() && success; 418 419 LOG.info("Restoring cluster - done"); 420 return success; 421 } 422 423 protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) { 424 List<IOException> deferred = new ArrayList<>(); 425 // check whether current master has changed 426 final ServerName initMaster = initial.getMasterName(); 427 if (!ServerName.isSameAddress(initMaster, current.getMasterName())) { 428 LOG.info("Restoring cluster - Initial active master : {} has changed to : {}", 429 initMaster.getAddress(), current.getMasterName().getAddress()); 430 // If initial master is stopped, start it, before restoring the state. 431 // It will come up as a backup master, if there is already an active master. 432 try { 433 if ( 434 !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(), 435 initMaster.getPort()) 436 ) { 437 LOG.info("Restoring cluster - starting initial active master at:{}", 438 initMaster.getAddress()); 439 startMaster(initMaster.getHostname(), initMaster.getPort()); 440 } 441 442 // master has changed, we would like to undo this. 443 // 1. Kill the current backups 444 // 2. Stop current master 445 // 3. Start backup masters 446 for (ServerName currentBackup : current.getBackupMasterNames()) { 447 if (!ServerName.isSameAddress(currentBackup, initMaster)) { 448 LOG.info("Restoring cluster - stopping backup master: {}", currentBackup); 449 stopMaster(currentBackup); 450 } 451 } 452 LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName()); 453 stopMaster(current.getMasterName()); 454 waitForActiveAndReadyMaster(); // wait so that active master takes over 455 } catch (IOException ex) { 456 // if we fail to start the initial active master, we do not want to continue stopping 457 // backup masters. Just keep what we have now 458 deferred.add(ex); 459 } 460 461 // start backup masters 462 for (ServerName backup : initial.getBackupMasterNames()) { 463 try { 464 // these are not started in backup mode, but we should already have an active master 465 if ( 466 !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(), 467 backup.getPort()) 468 ) { 469 LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress()); 470 startMaster(backup.getHostname(), backup.getPort()); 471 } 472 } catch (IOException ex) { 473 deferred.add(ex); 474 } 475 } 476 } else { 477 // current master has not changed, match up backup masters 478 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 479 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 480 toStart.addAll(initial.getBackupMasterNames()); 481 toKill.addAll(current.getBackupMasterNames()); 482 483 for (ServerName server : current.getBackupMasterNames()) { 484 toStart.remove(server); 485 } 486 for (ServerName server : initial.getBackupMasterNames()) { 487 toKill.remove(server); 488 } 489 490 for (ServerName sn : toStart) { 491 try { 492 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 493 LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress()); 494 startMaster(sn.getHostname(), sn.getPort()); 495 } 496 } catch (IOException ex) { 497 deferred.add(ex); 498 } 499 } 500 501 for (ServerName sn : toKill) { 502 try { 503 if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 504 LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress()); 505 stopMaster(sn); 506 } 507 } catch (IOException ex) { 508 deferred.add(ex); 509 } 510 } 511 } 512 if (!deferred.isEmpty()) { 513 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 514 for (int i = 0; i < deferred.size() && i < 3; i++) { 515 LOG.warn(Objects.toString(deferred.get(i))); 516 } 517 } 518 519 return deferred.isEmpty(); 520 } 521 522 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> { 523 @Override 524 public int compare(ServerName o1, ServerName o2) { 525 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); 526 if (compare != 0) return compare; 527 compare = o1.getPort() - o2.getPort(); 528 if (compare != 0) return compare; 529 return 0; 530 } 531 } 532 533 protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) { 534 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 535 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 536 toStart.addAll(initial.getLiveServerMetrics().keySet()); 537 toKill.addAll(current.getLiveServerMetrics().keySet()); 538 539 ServerName master = initial.getMasterName(); 540 541 for (ServerName server : current.getLiveServerMetrics().keySet()) { 542 toStart.remove(server); 543 } 544 for (ServerName server : initial.getLiveServerMetrics().keySet()) { 545 toKill.remove(server); 546 } 547 548 List<IOException> deferred = new ArrayList<>(); 549 550 for (ServerName sn : toStart) { 551 try { 552 if ( 553 !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 554 && master.getPort() != sn.getPort() 555 ) { 556 LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress()); 557 startRegionServer(sn.getHostname(), sn.getPort()); 558 } 559 } catch (IOException ex) { 560 deferred.add(ex); 561 } 562 } 563 564 for (ServerName sn : toKill) { 565 try { 566 if ( 567 clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 568 && master.getPort() != sn.getPort() 569 ) { 570 LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress()); 571 stopRegionServer(sn); 572 } 573 } catch (IOException ex) { 574 deferred.add(ex); 575 } 576 } 577 if (!deferred.isEmpty()) { 578 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 579 for (int i = 0; i < deferred.size() && i < 3; i++) { 580 LOG.warn(Objects.toString(deferred.get(i))); 581 } 582 } 583 584 return deferred.isEmpty(); 585 } 586 587 protected boolean restoreAdmin() throws IOException { 588 // While restoring above, if the HBase Master which was initially the Active one, was down 589 // and the restore put the cluster back to Initial configuration, HAdmin instance will need 590 // to refresh its connections (otherwise it will return incorrect information) or we can 591 // point it to new instance. 592 admin.close(); 593 this.admin = this.connection.getAdmin(); 594 LOG.info("Added new HBaseAdmin"); 595 return true; 596 } 597}