001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Objects; 026import java.util.Set; 027import java.util.TreeSet; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.ClusterManager.ServiceType; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ClusterConnection; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.RegionInfo; 035import org.apache.hadoop.hbase.client.RegionLocator; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 044 045/** 046 * Manages the interactions with an already deployed distributed cluster (as opposed to a 047 * pseudo-distributed, or mini/local cluster). This is used by integration and system tests. 048 */ 049@InterfaceAudience.Private 050public class DistributedHBaseCluster extends HBaseCluster { 051 private Admin admin; 052 private final Connection connection; 053 054 private ClusterManager clusterManager; 055 /** 056 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 057 * restarted instances of the same server will have different ServerName and will not coincide 058 * with past dead ones. So there's no need to cleanup this list. 059 */ 060 private final Set<ServerName> killedRegionServers = new HashSet<>(); 061 062 public DistributedHBaseCluster(Configuration conf, ClusterManager clusterManager) 063 throws IOException { 064 super(conf); 065 this.clusterManager = clusterManager; 066 this.connection = ConnectionFactory.createConnection(conf); 067 this.admin = this.connection.getAdmin(); 068 this.initialClusterStatus = getClusterMetrics(); 069 } 070 071 public void setClusterManager(ClusterManager clusterManager) { 072 this.clusterManager = clusterManager; 073 } 074 075 public ClusterManager getClusterManager() { 076 return clusterManager; 077 } 078 079 /** 080 * Returns a ClusterStatus for this HBase cluster 081 */ 082 @Override 083 public ClusterMetrics getClusterMetrics() throws IOException { 084 return admin.getClusterMetrics(); 085 } 086 087 @Override 088 public ClusterMetrics getInitialClusterMetrics() throws IOException { 089 return initialClusterStatus; 090 } 091 092 @Override 093 public void close() throws IOException { 094 if (this.admin != null) { 095 admin.close(); 096 } 097 if (this.connection != null && !this.connection.isClosed()) { 098 this.connection.close(); 099 } 100 } 101 102 @Override 103 public AdminProtos.AdminService.BlockingInterface getAdminProtocol(ServerName serverName) 104 throws IOException { 105 return ((ClusterConnection) this.connection).getAdmin(serverName); 106 } 107 108 @Override 109 public ClientProtos.ClientService.BlockingInterface getClientProtocol(ServerName serverName) 110 throws IOException { 111 return ((ClusterConnection) this.connection).getClient(serverName); 112 } 113 114 @Override 115 public void startRegionServer(String hostname, int port) throws IOException { 116 LOG.info("Starting RS on: {}", hostname); 117 clusterManager.start(ServiceType.HBASE_REGIONSERVER, hostname, port); 118 } 119 120 @Override 121 public void killRegionServer(ServerName serverName) throws IOException { 122 LOG.info("Aborting RS: {}", serverName.getServerName()); 123 killedRegionServers.add(serverName); 124 clusterManager.kill(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 125 serverName.getPort()); 126 } 127 128 @Override 129 public boolean isKilledRS(ServerName serverName) { 130 return killedRegionServers.contains(serverName); 131 } 132 133 @Override 134 public void stopRegionServer(ServerName serverName) throws IOException { 135 LOG.info("Stopping RS: {}", serverName.getServerName()); 136 clusterManager.stop(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 137 serverName.getPort()); 138 } 139 140 @Override 141 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 142 waitForServiceToStop(ServiceType.HBASE_REGIONSERVER, serverName, timeout); 143 } 144 145 @Override 146 public void suspendRegionServer(ServerName serverName) throws IOException { 147 LOG.info("Suspend RS: {}", serverName.getServerName()); 148 clusterManager.suspend(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 149 serverName.getPort()); 150 } 151 152 @Override 153 public void resumeRegionServer(ServerName serverName) throws IOException { 154 LOG.info("Resume RS: {}", serverName.getServerName()); 155 clusterManager.resume(ServiceType.HBASE_REGIONSERVER, serverName.getHostname(), 156 serverName.getPort()); 157 } 158 159 @Override 160 public void startZkNode(String hostname, int port) throws IOException { 161 LOG.info("Starting ZooKeeper node on: {}", hostname); 162 clusterManager.start(ServiceType.ZOOKEEPER_SERVER, hostname, port); 163 } 164 165 @Override 166 public void killZkNode(ServerName serverName) throws IOException { 167 LOG.info("Aborting ZooKeeper node on: {}", serverName.getServerName()); 168 clusterManager.kill(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 169 serverName.getPort()); 170 } 171 172 @Override 173 public void stopZkNode(ServerName serverName) throws IOException { 174 LOG.info("Stopping ZooKeeper node: {}", serverName.getServerName()); 175 clusterManager.stop(ServiceType.ZOOKEEPER_SERVER, serverName.getHostname(), 176 serverName.getPort()); 177 } 178 179 @Override 180 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 181 waitForServiceToStart(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 182 } 183 184 @Override 185 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 186 waitForServiceToStop(ServiceType.ZOOKEEPER_SERVER, serverName, timeout); 187 } 188 189 @Override 190 public void startDataNode(ServerName serverName) throws IOException { 191 LOG.info("Starting data node on: {}", serverName.getServerName()); 192 clusterManager.start(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 193 serverName.getPort()); 194 } 195 196 @Override 197 public void killDataNode(ServerName serverName) throws IOException { 198 LOG.info("Aborting data node on: {}", serverName.getServerName()); 199 clusterManager.kill(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 200 serverName.getPort()); 201 } 202 203 @Override 204 public void stopDataNode(ServerName serverName) throws IOException { 205 LOG.info("Stopping data node on: {}", serverName.getServerName()); 206 clusterManager.stop(ServiceType.HADOOP_DATANODE, serverName.getHostname(), 207 serverName.getPort()); 208 } 209 210 @Override 211 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 212 waitForServiceToStart(ServiceType.HADOOP_DATANODE, serverName, timeout); 213 } 214 215 @Override 216 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 217 waitForServiceToStop(ServiceType.HADOOP_DATANODE, serverName, timeout); 218 } 219 220 @Override 221 public void startNameNode(ServerName serverName) throws IOException { 222 LOG.info("Starting name node on: {}", serverName.getServerName()); 223 clusterManager.start(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 224 serverName.getPort()); 225 } 226 227 @Override 228 public void killNameNode(ServerName serverName) throws IOException { 229 LOG.info("Aborting name node on: {}", serverName.getServerName()); 230 clusterManager.kill(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 231 serverName.getPort()); 232 } 233 234 @Override 235 public void stopNameNode(ServerName serverName) throws IOException { 236 LOG.info("Stopping name node on: {}", serverName.getServerName()); 237 clusterManager.stop(ServiceType.HADOOP_NAMENODE, serverName.getHostname(), 238 serverName.getPort()); 239 } 240 241 @Override 242 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 243 waitForServiceToStart(ServiceType.HADOOP_NAMENODE, serverName, timeout); 244 } 245 246 @Override 247 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 248 waitForServiceToStop(ServiceType.HADOOP_NAMENODE, serverName, timeout); 249 } 250 251 @Override 252 public void startJournalNode(ServerName serverName) throws IOException { 253 LOG.info("Starting journal node on: {}", serverName.getServerName()); 254 clusterManager.start(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 255 serverName.getPort()); 256 } 257 258 @Override 259 public void killJournalNode(ServerName serverName) throws IOException { 260 LOG.info("Aborting journal node on: {}", serverName.getServerName()); 261 clusterManager.kill(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 262 serverName.getPort()); 263 } 264 265 @Override 266 public void stopJournalNode(ServerName serverName) throws IOException { 267 LOG.info("Stopping journal node on: {}", serverName.getServerName()); 268 clusterManager.stop(ServiceType.HADOOP_JOURNALNODE, serverName.getHostname(), 269 serverName.getPort()); 270 } 271 272 @Override 273 public void waitForJournalNodeToStart(ServerName serverName, long timeout) throws IOException { 274 waitForServiceToStart(ServiceType.HADOOP_JOURNALNODE, serverName, timeout); 275 } 276 277 @Override 278 public void waitForJournalNodeToStop(ServerName serverName, long timeout) throws IOException { 279 waitForServiceToStop(ServiceType.HADOOP_JOURNALNODE, serverName, timeout); 280 } 281 282 private void waitForServiceToStop(ServiceType service, ServerName serverName, long timeout) 283 throws IOException { 284 LOG.info("Waiting for service: {} to stop: {}", service, serverName.getServerName()); 285 long start = EnvironmentEdgeManager.currentTime(); 286 287 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 288 if (!clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 289 return; 290 } 291 Threads.sleep(100); 292 } 293 throw new IOException("Timed-out waiting for service to stop: " + serverName); 294 } 295 296 private void waitForServiceToStart(ServiceType service, ServerName serverName, long timeout) 297 throws IOException { 298 LOG.info("Waiting for service: {} to start: {}", service, serverName.getServerName()); 299 long start = EnvironmentEdgeManager.currentTime(); 300 301 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 302 if (clusterManager.isRunning(service, serverName.getHostname(), serverName.getPort())) { 303 return; 304 } 305 Threads.sleep(100); 306 } 307 throw new IOException("Timed-out waiting for service to start: " + serverName); 308 } 309 310 @Override 311 public MasterService.BlockingInterface getMasterAdminService() throws IOException { 312 return ((ClusterConnection) this.connection).getMaster(); 313 } 314 315 @Override 316 public void startMaster(String hostname, int port) throws IOException { 317 LOG.info("Starting Master on: {}:{}", hostname, port); 318 clusterManager.start(ServiceType.HBASE_MASTER, hostname, port); 319 } 320 321 @Override 322 public void killMaster(ServerName serverName) throws IOException { 323 LOG.info("Aborting Master: {}", serverName.getServerName()); 324 clusterManager.kill(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 325 } 326 327 @Override 328 public void stopMaster(ServerName serverName) throws IOException { 329 LOG.info("Stopping Master: {}", serverName.getServerName()); 330 clusterManager.stop(ServiceType.HBASE_MASTER, serverName.getHostname(), serverName.getPort()); 331 } 332 333 @Override 334 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 335 waitForServiceToStop(ServiceType.HBASE_MASTER, serverName, timeout); 336 } 337 338 @Override 339 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 340 long start = EnvironmentEdgeManager.currentTime(); 341 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 342 try { 343 getMasterAdminService(); 344 return true; 345 } catch (MasterNotRunningException m) { 346 LOG.warn("Master not started yet " + m); 347 } catch (ZooKeeperConnectionException e) { 348 LOG.warn("Failed to connect to ZK " + e); 349 } 350 Threads.sleep(1000); 351 } 352 return false; 353 } 354 355 @Override 356 public ServerName getServerHoldingRegion(TableName tn, byte[] regionName) throws IOException { 357 byte[] startKey = RegionInfo.getStartKey(regionName); 358 HRegionLocation regionLoc = null; 359 try (RegionLocator locator = connection.getRegionLocator(tn)) { 360 regionLoc = locator.getRegionLocation(startKey, true); 361 } 362 if (regionLoc == null) { 363 LOG.warn("Cannot find region server holding region {}", Bytes.toStringBinary(regionName)); 364 return null; 365 } 366 return regionLoc.getServerName(); 367 } 368 369 @Override 370 public void waitUntilShutDown() { 371 // Simply wait for a few seconds for now (after issuing serverManager.kill 372 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 373 } 374 375 @Override 376 public void shutdown() throws IOException { 377 // not sure we want this 378 throw new RuntimeException(HConstants.NOT_IMPLEMENTED); 379 } 380 381 @Override 382 public boolean isDistributedCluster() { 383 return true; 384 } 385 386 @Override 387 public boolean restoreClusterMetrics(ClusterMetrics initial) throws IOException { 388 ClusterMetrics current = getClusterMetrics(); 389 390 LOG.info("Restoring cluster - started"); 391 392 // do a best effort restore 393 boolean success = restoreMasters(initial, current); 394 success = restoreRegionServers(initial, current) && success; 395 success = restoreAdmin() && success; 396 397 LOG.info("Restoring cluster - done"); 398 return success; 399 } 400 401 protected boolean restoreMasters(ClusterMetrics initial, ClusterMetrics current) { 402 List<IOException> deferred = new ArrayList<>(); 403 // check whether current master has changed 404 final ServerName initMaster = initial.getMasterName(); 405 if (!ServerName.isSameAddress(initMaster, current.getMasterName())) { 406 LOG.info("Restoring cluster - Initial active master : {} has changed to : {}", 407 initMaster.getAddress(), current.getMasterName().getAddress()); 408 // If initial master is stopped, start it, before restoring the state. 409 // It will come up as a backup master, if there is already an active master. 410 try { 411 if ( 412 !clusterManager.isRunning(ServiceType.HBASE_MASTER, initMaster.getHostname(), 413 initMaster.getPort()) 414 ) { 415 LOG.info("Restoring cluster - starting initial active master at:{}", 416 initMaster.getAddress()); 417 startMaster(initMaster.getHostname(), initMaster.getPort()); 418 } 419 420 // master has changed, we would like to undo this. 421 // 1. Kill the current backups 422 // 2. Stop current master 423 // 3. Start backup masters 424 for (ServerName currentBackup : current.getBackupMasterNames()) { 425 if (!ServerName.isSameAddress(currentBackup, initMaster)) { 426 LOG.info("Restoring cluster - stopping backup master: {}", currentBackup); 427 stopMaster(currentBackup); 428 } 429 } 430 LOG.info("Restoring cluster - stopping active master: {}", current.getMasterName()); 431 stopMaster(current.getMasterName()); 432 waitForActiveAndReadyMaster(); // wait so that active master takes over 433 } catch (IOException ex) { 434 // if we fail to start the initial active master, we do not want to continue stopping 435 // backup masters. Just keep what we have now 436 deferred.add(ex); 437 } 438 439 // start backup masters 440 for (ServerName backup : initial.getBackupMasterNames()) { 441 try { 442 // these are not started in backup mode, but we should already have an active master 443 if ( 444 !clusterManager.isRunning(ServiceType.HBASE_MASTER, backup.getHostname(), 445 backup.getPort()) 446 ) { 447 LOG.info("Restoring cluster - starting initial backup master: {}", backup.getAddress()); 448 startMaster(backup.getHostname(), backup.getPort()); 449 } 450 } catch (IOException ex) { 451 deferred.add(ex); 452 } 453 } 454 } else { 455 // current master has not changed, match up backup masters 456 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 457 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 458 toStart.addAll(initial.getBackupMasterNames()); 459 toKill.addAll(current.getBackupMasterNames()); 460 461 for (ServerName server : current.getBackupMasterNames()) { 462 toStart.remove(server); 463 } 464 for (ServerName server : initial.getBackupMasterNames()) { 465 toKill.remove(server); 466 } 467 468 for (ServerName sn : toStart) { 469 try { 470 if (!clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 471 LOG.info("Restoring cluster - starting initial backup master: {}", sn.getAddress()); 472 startMaster(sn.getHostname(), sn.getPort()); 473 } 474 } catch (IOException ex) { 475 deferred.add(ex); 476 } 477 } 478 479 for (ServerName sn : toKill) { 480 try { 481 if (clusterManager.isRunning(ServiceType.HBASE_MASTER, sn.getHostname(), sn.getPort())) { 482 LOG.info("Restoring cluster - stopping backup master: {}", sn.getAddress()); 483 stopMaster(sn); 484 } 485 } catch (IOException ex) { 486 deferred.add(ex); 487 } 488 } 489 } 490 if (!deferred.isEmpty()) { 491 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 492 for (int i = 0; i < deferred.size() && i < 3; i++) { 493 LOG.warn(Objects.toString(deferred.get(i))); 494 } 495 } 496 497 return deferred.isEmpty(); 498 } 499 500 private static class ServerNameIgnoreStartCodeComparator implements Comparator<ServerName> { 501 @Override 502 public int compare(ServerName o1, ServerName o2) { 503 int compare = o1.getHostname().compareToIgnoreCase(o2.getHostname()); 504 if (compare != 0) return compare; 505 compare = o1.getPort() - o2.getPort(); 506 if (compare != 0) return compare; 507 return 0; 508 } 509 } 510 511 protected boolean restoreRegionServers(ClusterMetrics initial, ClusterMetrics current) { 512 Set<ServerName> toStart = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 513 Set<ServerName> toKill = new TreeSet<>(new ServerNameIgnoreStartCodeComparator()); 514 toStart.addAll(initial.getLiveServerMetrics().keySet()); 515 toKill.addAll(current.getLiveServerMetrics().keySet()); 516 517 ServerName master = initial.getMasterName(); 518 519 for (ServerName server : current.getLiveServerMetrics().keySet()) { 520 toStart.remove(server); 521 } 522 for (ServerName server : initial.getLiveServerMetrics().keySet()) { 523 toKill.remove(server); 524 } 525 526 List<IOException> deferred = new ArrayList<>(); 527 528 for (ServerName sn : toStart) { 529 try { 530 if ( 531 !clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 532 && master.getPort() != sn.getPort() 533 ) { 534 LOG.info("Restoring cluster - starting initial region server: {}", sn.getAddress()); 535 startRegionServer(sn.getHostname(), sn.getPort()); 536 } 537 } catch (IOException ex) { 538 deferred.add(ex); 539 } 540 } 541 542 for (ServerName sn : toKill) { 543 try { 544 if ( 545 clusterManager.isRunning(ServiceType.HBASE_REGIONSERVER, sn.getHostname(), sn.getPort()) 546 && master.getPort() != sn.getPort() 547 ) { 548 LOG.info("Restoring cluster - stopping initial region server: {}", sn.getAddress()); 549 stopRegionServer(sn); 550 } 551 } catch (IOException ex) { 552 deferred.add(ex); 553 } 554 } 555 if (!deferred.isEmpty()) { 556 LOG.warn("Restoring cluster - restoring region servers reported {} errors:", deferred.size()); 557 for (int i = 0; i < deferred.size() && i < 3; i++) { 558 LOG.warn(Objects.toString(deferred.get(i))); 559 } 560 } 561 562 return deferred.isEmpty(); 563 } 564 565 protected boolean restoreAdmin() throws IOException { 566 // While restoring above, if the HBase Master which was initially the Active one, was down 567 // and the restore put the cluster back to Initial configuration, HAdmin instance will need 568 // to refresh its connections (otherwise it will return incorrect information) or we can 569 // point it to new instance. 570 try { 571 admin.close(); 572 } catch (IOException ioe) { 573 LOG.warn("While closing the old connection", ioe); 574 } 575 this.admin = this.connection.getAdmin(); 576 LOG.info("Added new HBaseAdmin"); 577 return true; 578 } 579}