001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.ArrayList; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.client.RegionInfoBuilder; 029import org.apache.hadoop.hbase.client.RegionReplicaUtil; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegion; 032import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 033import org.apache.hadoop.hbase.regionserver.HRegionServer; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.test.MetricsAssertHelper; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.JVMClusterUtil; 039import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 040import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 041import org.apache.hadoop.hbase.util.Threads; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.apache.yetus.audience.InterfaceStability; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; 048 049/** 050 * This class creates a single process HBase cluster. each server. The master uses the 'default' 051 * FileSystem. The RegionServers, if we are running on DistributedFilesystem, create a FileSystem 052 * instance each and will close down their instance on the way out. 053 */ 054@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 055@InterfaceStability.Evolving 056public class SingleProcessHBaseCluster extends HBaseClusterInterface { 057 private static final Logger LOG = 058 LoggerFactory.getLogger(SingleProcessHBaseCluster.class.getName()); 059 public LocalHBaseCluster hbaseCluster; 060 private static int index; 061 062 /** 063 * Start a MiniHBaseCluster. 064 * @param conf Configuration to be used for cluster 065 * @param numRegionServers initial number of region servers to start. 066 */ 067 public SingleProcessHBaseCluster(Configuration conf, int numRegionServers) 068 throws IOException, InterruptedException { 069 this(conf, 1, numRegionServers); 070 } 071 072 /** 073 * Start a MiniHBaseCluster. 074 * @param conf Configuration to be used for cluster 075 * @param numMasters initial number of masters to start. 076 * @param numRegionServers initial number of region servers to start. 077 */ 078 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers) 079 throws IOException, InterruptedException { 080 this(conf, numMasters, numRegionServers, null, null); 081 } 082 083 /** 084 * Start a MiniHBaseCluster. 085 * @param conf Configuration to be used for cluster 086 * @param numMasters initial number of masters to start. 087 * @param numRegionServers initial number of region servers to start. 088 */ 089 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numRegionServers, 090 Class<? extends HMaster> masterClass, 091 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 092 throws IOException, InterruptedException { 093 this(conf, numMasters, 0, numRegionServers, null, masterClass, regionserverClass); 094 } 095 096 /** 097 * @param rsPorts Ports that RegionServer should use; pass ports if you want to test cluster 098 * restart where for sure the regionservers come up on same address+port (but just 099 * with different startcode); by default mini hbase clusters choose new arbitrary 100 * ports on each cluster start. 101 */ 102 public SingleProcessHBaseCluster(Configuration conf, int numMasters, int numAlwaysStandByMasters, 103 int numRegionServers, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 104 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 105 throws IOException, InterruptedException { 106 super(conf); 107 108 // Hadoop 2 109 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init(); 110 111 init(numMasters, numAlwaysStandByMasters, numRegionServers, rsPorts, masterClass, 112 regionserverClass); 113 this.initialClusterStatus = getClusterMetrics(); 114 } 115 116 public Configuration getConfiguration() { 117 return this.conf; 118 } 119 120 /** 121 * Subclass so can get at protected methods (none at moment). Also, creates a FileSystem instance 122 * per instantiation. Adds a shutdown own FileSystem on the way out. Shuts down own Filesystem 123 * only, not All filesystems as the FileSystem system exit hook does. 124 */ 125 public static class MiniHBaseClusterRegionServer extends HRegionServer { 126 private Thread shutdownThread = null; 127 private User user = null; 128 /** 129 * List of RegionServers killed so far. ServerName also comprises startCode of a server, so any 130 * restarted instances of the same server will have different ServerName and will not coincide 131 * with past dead ones. So there's no need to cleanup this list. 132 */ 133 static Set<ServerName> killedServers = new HashSet<>(); 134 135 public MiniHBaseClusterRegionServer(Configuration conf) 136 throws IOException, InterruptedException { 137 super(conf); 138 this.user = User.getCurrent(); 139 } 140 141 @Override 142 protected void handleReportForDutyResponse(final RegionServerStartupResponse c) 143 throws IOException { 144 super.handleReportForDutyResponse(c); 145 // Run this thread to shutdown our filesystem on way out. 146 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); 147 } 148 149 @Override 150 public void run() { 151 try { 152 this.user.runAs(new PrivilegedAction<Object>() { 153 @Override 154 public Object run() { 155 runRegionServer(); 156 return null; 157 } 158 }); 159 } catch (Throwable t) { 160 LOG.error("Exception in run", t); 161 } finally { 162 // Run this on the way out. 163 if (this.shutdownThread != null) { 164 this.shutdownThread.start(); 165 Threads.shutdown(this.shutdownThread, 30000); 166 } 167 } 168 } 169 170 private void runRegionServer() { 171 super.run(); 172 } 173 174 @Override 175 protected void kill() { 176 killedServers.add(getServerName()); 177 super.kill(); 178 } 179 180 @Override 181 public void abort(final String reason, final Throwable cause) { 182 this.user.runAs(new PrivilegedAction<Object>() { 183 @Override 184 public Object run() { 185 abortRegionServer(reason, cause); 186 return null; 187 } 188 }); 189 } 190 191 private void abortRegionServer(String reason, Throwable cause) { 192 super.abort(reason, cause); 193 } 194 } 195 196 /** 197 * Alternate shutdown hook. Just shuts down the passed fs, not all as default filesystem hook 198 * does. 199 */ 200 static class SingleFileSystemShutdownThread extends Thread { 201 private final FileSystem fs; 202 203 SingleFileSystemShutdownThread(final FileSystem fs) { 204 super("Shutdown of " + fs); 205 this.fs = fs; 206 } 207 208 @Override 209 public void run() { 210 try { 211 LOG.info("Hook closing fs=" + this.fs); 212 this.fs.close(); 213 } catch (IOException e) { 214 LOG.warn("Running hook", e); 215 } 216 } 217 } 218 219 private void init(final int nMasterNodes, final int numAlwaysStandByMasters, 220 final int nRegionNodes, List<Integer> rsPorts, Class<? extends HMaster> masterClass, 221 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass) 222 throws IOException, InterruptedException { 223 try { 224 if (masterClass == null) { 225 masterClass = HMaster.class; 226 } 227 if (regionserverClass == null) { 228 regionserverClass = SingleProcessHBaseCluster.MiniHBaseClusterRegionServer.class; 229 } 230 231 // start up a LocalHBaseCluster 232 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, numAlwaysStandByMasters, 0, 233 masterClass, regionserverClass); 234 235 // manually add the regionservers as other users 236 for (int i = 0; i < nRegionNodes; i++) { 237 Configuration rsConf = HBaseConfiguration.create(conf); 238 if (rsPorts != null) { 239 rsConf.setInt(HConstants.REGIONSERVER_PORT, rsPorts.get(i)); 240 } 241 User user = HBaseTestingUtil.getDifferentUser(rsConf, ".hfs." + index++); 242 hbaseCluster.addRegionServer(rsConf, i, user); 243 } 244 245 hbaseCluster.startup(); 246 } catch (IOException e) { 247 shutdown(); 248 throw e; 249 } catch (Throwable t) { 250 LOG.error("Error starting cluster", t); 251 shutdown(); 252 throw new IOException("Shutting down", t); 253 } 254 } 255 256 @Override 257 public void startRegionServer(String hostname, int port) throws IOException { 258 final Configuration newConf = HBaseConfiguration.create(conf); 259 newConf.setInt(HConstants.REGIONSERVER_PORT, port); 260 startRegionServer(newConf); 261 } 262 263 @Override 264 public void killRegionServer(ServerName serverName) throws IOException { 265 HRegionServer server = getRegionServer(getRegionServerIndex(serverName)); 266 if (server instanceof MiniHBaseClusterRegionServer) { 267 LOG.info("Killing " + server.toString()); 268 ((MiniHBaseClusterRegionServer) server).kill(); 269 } else { 270 abortRegionServer(getRegionServerIndex(serverName)); 271 } 272 } 273 274 @Override 275 public boolean isKilledRS(ServerName serverName) { 276 return MiniHBaseClusterRegionServer.killedServers.contains(serverName); 277 } 278 279 @Override 280 public void stopRegionServer(ServerName serverName) throws IOException { 281 stopRegionServer(getRegionServerIndex(serverName)); 282 } 283 284 @Override 285 public void suspendRegionServer(ServerName serverName) throws IOException { 286 suspendRegionServer(getRegionServerIndex(serverName)); 287 } 288 289 @Override 290 public void resumeRegionServer(ServerName serverName) throws IOException { 291 resumeRegionServer(getRegionServerIndex(serverName)); 292 } 293 294 @Override 295 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException { 296 // ignore timeout for now 297 waitOnRegionServer(getRegionServerIndex(serverName)); 298 } 299 300 @Override 301 public void waitForRegionServerToSuspend(ServerName serverName, long timeout) throws IOException { 302 LOG.warn("Waiting for regionserver to suspend on mini cluster is not supported"); 303 } 304 305 @Override 306 public void waitForRegionServerToResume(ServerName serverName, long timeout) throws IOException { 307 LOG.warn("Waiting for regionserver to resume on mini cluster is not supported"); 308 } 309 310 @Override 311 public void startZkNode(String hostname, int port) throws IOException { 312 LOG.warn("Starting zookeeper nodes on mini cluster is not supported"); 313 } 314 315 @Override 316 public void killZkNode(ServerName serverName) throws IOException { 317 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported"); 318 } 319 320 @Override 321 public void stopZkNode(ServerName serverName) throws IOException { 322 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported"); 323 } 324 325 @Override 326 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException { 327 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported"); 328 } 329 330 @Override 331 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException { 332 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported"); 333 } 334 335 @Override 336 public void startDataNode(ServerName serverName) throws IOException { 337 LOG.warn("Starting datanodes on mini cluster is not supported"); 338 } 339 340 @Override 341 public void killDataNode(ServerName serverName) throws IOException { 342 LOG.warn("Aborting datanodes on mini cluster is not supported"); 343 } 344 345 @Override 346 public void stopDataNode(ServerName serverName) throws IOException { 347 LOG.warn("Stopping datanodes on mini cluster is not supported"); 348 } 349 350 @Override 351 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException { 352 LOG.warn("Waiting for datanodes to start on mini cluster is not supported"); 353 } 354 355 @Override 356 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException { 357 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported"); 358 } 359 360 @Override 361 public void startNameNode(ServerName serverName) throws IOException { 362 LOG.warn("Starting namenodes on mini cluster is not supported"); 363 } 364 365 @Override 366 public void killNameNode(ServerName serverName) throws IOException { 367 LOG.warn("Aborting namenodes on mini cluster is not supported"); 368 } 369 370 @Override 371 public void stopNameNode(ServerName serverName) throws IOException { 372 LOG.warn("Stopping namenodes on mini cluster is not supported"); 373 } 374 375 @Override 376 public void waitForNameNodeToStart(ServerName serverName, long timeout) throws IOException { 377 LOG.warn("Waiting for namenodes to start on mini cluster is not supported"); 378 } 379 380 @Override 381 public void waitForNameNodeToStop(ServerName serverName, long timeout) throws IOException { 382 LOG.warn("Waiting for namenodes to stop on mini cluster is not supported"); 383 } 384 385 @Override 386 public void startJournalNode(ServerName serverName) { 387 LOG.warn("Starting journalnodes on mini cluster is not supported"); 388 } 389 390 @Override 391 public void killJournalNode(ServerName serverName) { 392 LOG.warn("Aborting journalnodes on mini cluster is not supported"); 393 } 394 395 @Override 396 public void stopJournalNode(ServerName serverName) { 397 LOG.warn("Stopping journalnodes on mini cluster is not supported"); 398 } 399 400 @Override 401 public void waitForJournalNodeToStart(ServerName serverName, long timeout) { 402 LOG.warn("Waiting for journalnodes to start on mini cluster is not supported"); 403 } 404 405 @Override 406 public void waitForJournalNodeToStop(ServerName serverName, long timeout) { 407 LOG.warn("Waiting for journalnodes to stop on mini cluster is not supported"); 408 } 409 410 @Override 411 public void startMaster(String hostname, int port) throws IOException { 412 this.startMaster(); 413 } 414 415 @Override 416 public void killMaster(ServerName serverName) throws IOException { 417 abortMaster(getMasterIndex(serverName)); 418 } 419 420 @Override 421 public void stopMaster(ServerName serverName) throws IOException { 422 stopMaster(getMasterIndex(serverName)); 423 } 424 425 @Override 426 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException { 427 // ignore timeout for now 428 waitOnMaster(getMasterIndex(serverName)); 429 } 430 431 /** 432 * Starts a region server thread running 433 * @return New RegionServerThread 434 */ 435 public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { 436 final Configuration newConf = HBaseConfiguration.create(conf); 437 return startRegionServer(newConf); 438 } 439 440 private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration) 441 throws IOException { 442 User rsUser = HBaseTestingUtil.getDifferentUser(configuration, ".hfs." + index++); 443 JVMClusterUtil.RegionServerThread t = null; 444 try { 445 t = 446 hbaseCluster.addRegionServer(configuration, hbaseCluster.getRegionServers().size(), rsUser); 447 t.start(); 448 t.waitForServerOnline(); 449 } catch (InterruptedException ie) { 450 throw new IOException("Interrupted adding regionserver to cluster", ie); 451 } 452 return t; 453 } 454 455 /** 456 * Starts a region server thread and waits until its processed by master. Throws an exception when 457 * it can't start a region server or when the region server is not processed by master within the 458 * timeout. 459 * @return New RegionServerThread 460 */ 461 public JVMClusterUtil.RegionServerThread startRegionServerAndWait(long timeout) 462 throws IOException { 463 464 JVMClusterUtil.RegionServerThread t = startRegionServer(); 465 ServerName rsServerName = t.getRegionServer().getServerName(); 466 467 long start = EnvironmentEdgeManager.currentTime(); 468 ClusterMetrics clusterStatus = getClusterMetrics(); 469 while ((EnvironmentEdgeManager.currentTime() - start) < timeout) { 470 if (clusterStatus != null && clusterStatus.getLiveServerMetrics().containsKey(rsServerName)) { 471 return t; 472 } 473 Threads.sleep(100); 474 } 475 if (t.getRegionServer().isOnline()) { 476 throw new IOException("RS: " + rsServerName + " online, but not processed by master"); 477 } else { 478 throw new IOException("RS: " + rsServerName + " is offline"); 479 } 480 } 481 482 /** 483 * Cause a region server to exit doing basic clean up only on its way out. 484 * @param serverNumber Used as index into a list. 485 */ 486 public String abortRegionServer(int serverNumber) { 487 HRegionServer server = getRegionServer(serverNumber); 488 LOG.info("Aborting " + server.toString()); 489 server.abort("Aborting for tests", new Exception("Trace info")); 490 return server.toString(); 491 } 492 493 /** 494 * Shut down the specified region server cleanly 495 * @param serverNumber Used as index into a list. 496 * @return the region server that was stopped 497 */ 498 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { 499 return stopRegionServer(serverNumber, true); 500 } 501 502 /** 503 * Shut down the specified region server cleanly 504 * @param serverNumber Used as index into a list. 505 * @param shutdownFS True is we are to shutdown the filesystem as part of this regionserver's 506 * shutdown. Usually we do but you do not want to do this if you are running 507 * multiple regionservers in a test and you shut down one before end of the 508 * test. 509 * @return the region server that was stopped 510 */ 511 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, 512 final boolean shutdownFS) { 513 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 514 LOG.info("Stopping " + server.toString()); 515 server.getRegionServer().stop("Stopping rs " + serverNumber); 516 return server; 517 } 518 519 /** 520 * Suspend the specified region server 521 * @param serverNumber Used as index into a list. 522 */ 523 public JVMClusterUtil.RegionServerThread suspendRegionServer(int serverNumber) { 524 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 525 LOG.info("Suspending {}", server.toString()); 526 server.suspend(); 527 return server; 528 } 529 530 /** 531 * Resume the specified region server 532 * @param serverNumber Used as index into a list. 533 */ 534 public JVMClusterUtil.RegionServerThread resumeRegionServer(int serverNumber) { 535 JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); 536 LOG.info("Resuming {}", server.toString()); 537 server.resume(); 538 return server; 539 } 540 541 /** 542 * Wait for the specified region server to stop. Removes this thread from list of running threads. 543 * @return Name of region server that just went down. 544 */ 545 public String waitOnRegionServer(final int serverNumber) { 546 return this.hbaseCluster.waitOnRegionServer(serverNumber); 547 } 548 549 /** 550 * Starts a master thread running 551 * @return New RegionServerThread 552 */ 553 public JVMClusterUtil.MasterThread startMaster() throws IOException { 554 Configuration c = HBaseConfiguration.create(conf); 555 User user = HBaseTestingUtil.getDifferentUser(c, ".hfs." + index++); 556 557 JVMClusterUtil.MasterThread t = null; 558 try { 559 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user); 560 t.start(); 561 } catch (InterruptedException ie) { 562 throw new IOException("Interrupted adding master to cluster", ie); 563 } 564 conf.set(HConstants.MASTER_ADDRS_KEY, 565 hbaseCluster.getConfiguration().get(HConstants.MASTER_ADDRS_KEY)); 566 return t; 567 } 568 569 /** 570 * Returns the current active master, if available. 571 * @return the active HMaster, null if none is active. 572 */ 573 public HMaster getMaster() { 574 return this.hbaseCluster.getActiveMaster(); 575 } 576 577 /** 578 * Returns the current active master thread, if available. 579 * @return the active MasterThread, null if none is active. 580 */ 581 public MasterThread getMasterThread() { 582 for (MasterThread mt : hbaseCluster.getLiveMasters()) { 583 if (mt.getMaster().isActiveMaster()) { 584 return mt; 585 } 586 } 587 return null; 588 } 589 590 /** 591 * Returns the master at the specified index, if available. 592 * @return the active HMaster, null if none is active. 593 */ 594 public HMaster getMaster(final int serverNumber) { 595 return this.hbaseCluster.getMaster(serverNumber); 596 } 597 598 /** 599 * Cause a master to exit without shutting down entire cluster. 600 * @param serverNumber Used as index into a list. 601 */ 602 public String abortMaster(int serverNumber) { 603 HMaster server = getMaster(serverNumber); 604 LOG.info("Aborting " + server.toString()); 605 server.abort("Aborting for tests", new Exception("Trace info")); 606 return server.toString(); 607 } 608 609 /** 610 * Shut down the specified master cleanly 611 * @param serverNumber Used as index into a list. 612 * @return the region server that was stopped 613 */ 614 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) { 615 return stopMaster(serverNumber, true); 616 } 617 618 /** 619 * Shut down the specified master cleanly 620 * @param serverNumber Used as index into a list. 621 * @param shutdownFS True is we are to shutdown the filesystem as part of this master's 622 * shutdown. Usually we do but you do not want to do this if you are running 623 * multiple master in a test and you shut down one before end of the test. 624 * @return the master that was stopped 625 */ 626 public JVMClusterUtil.MasterThread stopMaster(int serverNumber, final boolean shutdownFS) { 627 JVMClusterUtil.MasterThread server = hbaseCluster.getMasters().get(serverNumber); 628 LOG.info("Stopping " + server.toString()); 629 server.getMaster().stop("Stopping master " + serverNumber); 630 return server; 631 } 632 633 /** 634 * Wait for the specified master to stop. Removes this thread from list of running threads. 635 * @return Name of master that just went down. 636 */ 637 public String waitOnMaster(final int serverNumber) { 638 return this.hbaseCluster.waitOnMaster(serverNumber); 639 } 640 641 /** 642 * Blocks until there is an active master and that master has completed initialization. 643 * @return true if an active master becomes available. false if there are no masters left. 644 */ 645 @Override 646 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException { 647 long start = EnvironmentEdgeManager.currentTime(); 648 while (EnvironmentEdgeManager.currentTime() - start < timeout) { 649 for (JVMClusterUtil.MasterThread mt : getMasterThreads()) { 650 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) { 651 return true; 652 } 653 } 654 Threads.sleep(100); 655 } 656 return false; 657 } 658 659 /** 660 * Returns list of master threads. 661 */ 662 public List<JVMClusterUtil.MasterThread> getMasterThreads() { 663 return this.hbaseCluster.getMasters(); 664 } 665 666 /** 667 * Returns list of live master threads (skips the aborted and the killed) 668 */ 669 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() { 670 return this.hbaseCluster.getLiveMasters(); 671 } 672 673 /** 674 * Wait for Mini HBase Cluster to shut down. 675 */ 676 public void join() { 677 this.hbaseCluster.join(); 678 } 679 680 /** 681 * Shut down the mini HBase cluster 682 */ 683 @Override 684 public void shutdown() throws IOException { 685 if (this.hbaseCluster != null) { 686 this.hbaseCluster.shutdown(); 687 } 688 } 689 690 @Override 691 public void close() throws IOException { 692 } 693 694 @Override 695 public ClusterMetrics getClusterMetrics() throws IOException { 696 HMaster master = getMaster(); 697 return master == null ? null : master.getClusterMetrics(); 698 } 699 700 private void executeFlush(HRegion region) throws IOException { 701 if (!RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { 702 return; 703 } 704 // retry 5 times if we can not flush 705 for (int i = 0; i < 5; i++) { 706 FlushResult result = region.flush(true); 707 if (result.getResult() != FlushResult.Result.CANNOT_FLUSH) { 708 return; 709 } 710 Threads.sleep(1000); 711 } 712 } 713 714 /** 715 * Call flushCache on all regions on all participating regionservers. 716 */ 717 public void flushcache() throws IOException { 718 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 719 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 720 executeFlush(r); 721 } 722 } 723 } 724 725 /** 726 * Call flushCache on all regions of the specified table. 727 */ 728 public void flushcache(TableName tableName) throws IOException { 729 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 730 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 731 if (r.getTableDescriptor().getTableName().equals(tableName)) { 732 executeFlush(r); 733 } 734 } 735 } 736 } 737 738 /** 739 * Call flushCache on all regions on all participating regionservers. 740 */ 741 public void compact(boolean major) throws IOException { 742 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 743 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 744 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 745 r.compact(major); 746 } 747 } 748 } 749 } 750 751 /** 752 * Call flushCache on all regions of the specified table. 753 */ 754 public void compact(TableName tableName, boolean major) throws IOException { 755 for (JVMClusterUtil.RegionServerThread t : this.hbaseCluster.getRegionServers()) { 756 for (HRegion r : t.getRegionServer().getOnlineRegionsLocalContext()) { 757 if (r.getTableDescriptor().getTableName().equals(tableName)) { 758 if (RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 759 r.compact(major); 760 } 761 } 762 } 763 } 764 } 765 766 /** 767 * Returns number of live region servers in the cluster currently. 768 */ 769 public int getNumLiveRegionServers() { 770 return this.hbaseCluster.getLiveRegionServers().size(); 771 } 772 773 /** 774 * Returns list of region server threads. Does not return the master even though it is also a 775 * region server. 776 */ 777 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { 778 return this.hbaseCluster.getRegionServers(); 779 } 780 781 /** 782 * Returns List of live region server threads (skips the aborted and the killed) 783 */ 784 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { 785 return this.hbaseCluster.getLiveRegionServers(); 786 } 787 788 /** 789 * Grab a numbered region server of your choice. 790 * @return region server 791 */ 792 public HRegionServer getRegionServer(int serverNumber) { 793 return hbaseCluster.getRegionServer(serverNumber); 794 } 795 796 public HRegionServer getRegionServer(ServerName serverName) { 797 return hbaseCluster.getRegionServers().stream().map(t -> t.getRegionServer()) 798 .filter(r -> r.getServerName().equals(serverName)).findFirst().orElse(null); 799 } 800 801 public List<HRegion> getRegions(byte[] tableName) { 802 return getRegions(TableName.valueOf(tableName)); 803 } 804 805 public List<HRegion> getRegions(TableName tableName) { 806 List<HRegion> ret = new ArrayList<>(); 807 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 808 HRegionServer hrs = rst.getRegionServer(); 809 for (Region region : hrs.getOnlineRegionsLocalContext()) { 810 if (region.getTableDescriptor().getTableName().equals(tableName)) { 811 ret.add((HRegion) region); 812 } 813 } 814 } 815 return ret; 816 } 817 818 /** 819 * Returns index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS 820 * carrying regionName. Returns -1 if none found. 821 */ 822 public int getServerWithMeta() { 823 return getServerWith(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 824 } 825 826 /** 827 * Get the location of the specified region 828 * @param regionName Name of the region in bytes 829 * @return Index into List of {@link SingleProcessHBaseCluster#getRegionServerThreads()} of HRS 830 * carrying hbase:meta. Returns -1 if none found. 831 */ 832 public int getServerWith(byte[] regionName) { 833 int index = 0; 834 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 835 HRegionServer hrs = rst.getRegionServer(); 836 if (!hrs.isStopped()) { 837 Region region = hrs.getOnlineRegion(regionName); 838 if (region != null) { 839 return index; 840 } 841 } 842 index++; 843 } 844 return -1; 845 } 846 847 @Override 848 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName) 849 throws IOException { 850 int index = getServerWith(regionName); 851 if (index < 0) { 852 return null; 853 } 854 return getRegionServer(index).getServerName(); 855 } 856 857 /** 858 * Counts the total numbers of regions being served by the currently online region servers by 859 * asking each how many regions they have. Does not look at hbase:meta at all. Count includes 860 * catalog tables. 861 * @return number of regions being served by all region servers 862 */ 863 public long countServedRegions() { 864 long count = 0; 865 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) { 866 count += rst.getRegionServer().getNumberOfOnlineRegions(); 867 } 868 return count; 869 } 870 871 /** 872 * Do a simulated kill all masters and regionservers. Useful when it is impossible to bring the 873 * mini-cluster back for clean shutdown. 874 */ 875 public void killAll() { 876 // Do backups first. 877 MasterThread activeMaster = null; 878 for (MasterThread masterThread : getMasterThreads()) { 879 if (!masterThread.getMaster().isActiveMaster()) { 880 masterThread.getMaster().abort("killAll"); 881 } else { 882 activeMaster = masterThread; 883 } 884 } 885 // Do active after. 886 if (activeMaster != null) { 887 activeMaster.getMaster().abort("killAll"); 888 } 889 for (RegionServerThread rst : getRegionServerThreads()) { 890 rst.getRegionServer().abort("killAll"); 891 } 892 } 893 894 @Override 895 public void waitUntilShutDown() { 896 this.hbaseCluster.join(); 897 } 898 899 public List<HRegion> findRegionsForTable(TableName tableName) { 900 ArrayList<HRegion> ret = new ArrayList<>(); 901 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { 902 HRegionServer hrs = rst.getRegionServer(); 903 for (Region region : hrs.getRegions(tableName)) { 904 if (region.getTableDescriptor().getTableName().equals(tableName)) { 905 ret.add((HRegion) region); 906 } 907 } 908 } 909 return ret; 910 } 911 912 protected int getRegionServerIndex(ServerName serverName) { 913 // we have a small number of region servers, this should be fine for now. 914 List<RegionServerThread> servers = getRegionServerThreads(); 915 for (int i = 0; i < servers.size(); i++) { 916 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) { 917 return i; 918 } 919 } 920 return -1; 921 } 922 923 protected int getMasterIndex(ServerName serverName) { 924 List<MasterThread> masters = getMasterThreads(); 925 for (int i = 0; i < masters.size(); i++) { 926 if (masters.get(i).getMaster().getServerName().equals(serverName)) { 927 return i; 928 } 929 } 930 return -1; 931 } 932}