001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedExceptionAction; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.CopyOnWriteArrayList; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.client.Admin; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.master.HMaster; 031import org.apache.hadoop.hbase.regionserver.HRegionServer; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.util.JVMClusterUtil; 034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 035import org.apache.hadoop.hbase.util.Threads; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This class creates a single process HBase cluster. One thread is created for a master and one per 042 * region server. Call {@link #startup()} to start the cluster running and {@link #shutdown()} to 043 * close it all down. {@link #join} the cluster is you want to wait on shutdown completion. 044 * <p> 045 * Runs master on port 16000 by default. Because we can't just kill the process -- not till 046 * HADOOP-1700 gets fixed and even then.... -- we need to be able to find the master with a remote 047 * client to run shutdown. To use a port other than 16000, set the hbase.master to a value of 048 * 'local:PORT': that is 'local', not 'localhost', and the port number the master should use instead 049 * of 16000. 050 */ 051@InterfaceAudience.Public 052public class LocalHBaseCluster { 053 private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class); 054 private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>(); 055 private final List<JVMClusterUtil.RegionServerThread> regionThreads = 056 new CopyOnWriteArrayList<>(); 057 private final static int DEFAULT_NO = 1; 058 /** local mode */ 059 public static final String LOCAL = "local"; 060 /** 'local:' */ 061 public static final String LOCAL_COLON = LOCAL + ":"; 062 public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports"; 063 064 private final Configuration conf; 065 private final Class<? extends HMaster> masterClass; 066 private final Class<? extends HRegionServer> regionServerClass; 067 068 /** 069 * Constructor. 070 */ 071 public LocalHBaseCluster(final Configuration conf) throws IOException { 072 this(conf, DEFAULT_NO); 073 } 074 075 /** 076 * Constructor. 077 * @param conf Configuration to use. Post construction has the master's address. 078 * @param noRegionServers Count of regionservers to start. 079 */ 080 public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException { 081 this(conf, 1, 0, noRegionServers, getMasterImplementation(conf), 082 getRegionServerImplementation(conf)); 083 } 084 085 /** 086 * Constructor. 087 * @param conf Configuration to use. Post construction has the active master address. 088 * @param noMasters Count of masters to start. 089 * @param noRegionServers Count of regionservers to start. 090 */ 091 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers) 092 throws IOException { 093 this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf), 094 getRegionServerImplementation(conf)); 095 } 096 097 @SuppressWarnings("unchecked") 098 private static Class<? extends HRegionServer> 099 getRegionServerImplementation(final Configuration conf) { 100 return (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, 101 HRegionServer.class); 102 } 103 104 @SuppressWarnings("unchecked") 105 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) { 106 return (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, HMaster.class); 107 } 108 109 public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers, 110 final Class<? extends HMaster> masterClass, 111 final Class<? extends HRegionServer> regionServerClass) throws IOException { 112 this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass); 113 } 114 115 /** 116 * Constructor. 117 * @param conf Configuration to use. Post construction has the master's address. 118 * @param noMasters Count of masters to start. 119 * @param noRegionServers Count of regionservers to start. 120 */ 121 @SuppressWarnings("unchecked") 122 public LocalHBaseCluster(final Configuration conf, final int noMasters, 123 final int noAlwaysStandByMasters, final int noRegionServers, 124 final Class<? extends HMaster> masterClass, 125 final Class<? extends HRegionServer> regionServerClass) throws IOException { 126 this.conf = conf; 127 128 // When active, if a port selection is default then we switch to random 129 if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) { 130 if ( 131 conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT) 132 == HConstants.DEFAULT_MASTER_PORT 133 ) { 134 LOG.debug("Setting Master Port to random."); 135 conf.set(HConstants.MASTER_PORT, "0"); 136 } 137 if ( 138 conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT) 139 == HConstants.DEFAULT_REGIONSERVER_PORT 140 ) { 141 LOG.debug("Setting RegionServer Port to random."); 142 conf.set(HConstants.REGIONSERVER_PORT, "0"); 143 } 144 // treat info ports special; expressly don't change '-1' (keep off) 145 // in case we make that the default behavior. 146 if ( 147 conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1 148 && conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 149 HConstants.DEFAULT_REGIONSERVER_INFOPORT) == HConstants.DEFAULT_REGIONSERVER_INFOPORT 150 ) { 151 LOG.debug("Setting RS InfoServer Port to random."); 152 conf.set(HConstants.REGIONSERVER_INFO_PORT, "0"); 153 } 154 if ( 155 conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 156 && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT) 157 == HConstants.DEFAULT_MASTER_INFOPORT 158 ) { 159 LOG.debug("Setting Master InfoServer Port to random."); 160 conf.set(HConstants.MASTER_INFO_PORT, "0"); 161 } 162 } 163 164 this.masterClass = 165 (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, masterClass); 166 // Start the HMasters. 167 int i; 168 for (i = 0; i < noMasters; i++) { 169 addMaster(new Configuration(conf), i); 170 } 171 for (int j = 0; j < noAlwaysStandByMasters; j++) { 172 Configuration c = new Configuration(conf); 173 c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster"); 174 addMaster(c, i + j); 175 } 176 // Start the HRegionServers. 177 this.regionServerClass = (Class<? extends HRegionServer>) conf 178 .getClass(HConstants.REGION_SERVER_IMPL, regionServerClass); 179 180 for (int j = 0; j < noRegionServers; j++) { 181 addRegionServer(new Configuration(conf), j); 182 } 183 } 184 185 public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException { 186 return addRegionServer(new Configuration(conf), this.regionThreads.size()); 187 } 188 189 @SuppressWarnings("unchecked") 190 public JVMClusterUtil.RegionServerThread addRegionServer(Configuration config, final int index) 191 throws IOException { 192 // Create each regionserver with its own Configuration instance so each has 193 // its Connection instance rather than share (see HBASE_INSTANCES down in 194 // the guts of ConnectionManager). 195 JVMClusterUtil.RegionServerThread rst = 196 JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf 197 .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); 198 199 this.regionThreads.add(rst); 200 return rst; 201 } 202 203 public JVMClusterUtil.RegionServerThread addRegionServer(final Configuration config, 204 final int index, User user) throws IOException, InterruptedException { 205 return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() { 206 @Override 207 public JVMClusterUtil.RegionServerThread run() throws Exception { 208 return addRegionServer(config, index); 209 } 210 }); 211 } 212 213 public JVMClusterUtil.MasterThread addMaster() throws IOException { 214 return addMaster(new Configuration(conf), this.masterThreads.size()); 215 } 216 217 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index) 218 throws IOException { 219 // Create each master with its own Configuration instance so each has 220 // its Connection instance rather than share (see HBASE_INSTANCES down in 221 // the guts of ConnectionManager. 222 JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, 223 (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index); 224 this.masterThreads.add(mt); 225 // Refresh the master address config. 226 List<String> masterHostPorts = new ArrayList<>(); 227 getMasters().forEach(masterThread -> masterHostPorts 228 .add(masterThread.getMaster().getServerName().getAddress().toString())); 229 conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts)); 230 return mt; 231 } 232 233 public JVMClusterUtil.MasterThread addMaster(final Configuration c, final int index, User user) 234 throws IOException, InterruptedException { 235 return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() { 236 @Override 237 public JVMClusterUtil.MasterThread run() throws Exception { 238 return addMaster(c, index); 239 } 240 }); 241 } 242 243 /** Returns region server */ 244 public HRegionServer getRegionServer(int serverNumber) { 245 return regionThreads.get(serverNumber).getRegionServer(); 246 } 247 248 /** Returns Read-only list of region server threads. */ 249 public List<JVMClusterUtil.RegionServerThread> getRegionServers() { 250 return Collections.unmodifiableList(this.regionThreads); 251 } 252 253 /** 254 * @return List of running servers (Some servers may have been killed or aborted during lifetime 255 * of cluster; these servers are not included in this list). 256 */ 257 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() { 258 List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>(); 259 List<RegionServerThread> list = getRegionServers(); 260 for (JVMClusterUtil.RegionServerThread rst : list) { 261 if (rst.isAlive()) liveServers.add(rst); 262 else LOG.info("Not alive " + rst.getName()); 263 } 264 return liveServers; 265 } 266 267 /** Returns the Configuration used by this LocalHBaseCluster */ 268 public Configuration getConfiguration() { 269 return this.conf; 270 } 271 272 /** 273 * Wait for the specified region server to stop. Removes this thread from list of running threads. 274 * @return Name of region server that just went down. 275 */ 276 public String waitOnRegionServer(int serverNumber) { 277 JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber); 278 return waitOnRegionServer(regionServerThread); 279 } 280 281 /** 282 * Wait for the specified region server to stop. Removes this thread from list of running threads. 283 * @return Name of region server that just went down. 284 */ 285 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) { 286 boolean interrupted = false; 287 while (rst.isAlive()) { 288 try { 289 LOG.info("Waiting on " + rst.getRegionServer().toString()); 290 rst.join(); 291 } catch (InterruptedException e) { 292 LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e); 293 interrupted = true; 294 } 295 } 296 regionThreads.remove(rst); 297 if (interrupted) { 298 Thread.currentThread().interrupt(); 299 } 300 return rst.getName(); 301 } 302 303 /** Returns the HMaster thread */ 304 public HMaster getMaster(int serverNumber) { 305 return masterThreads.get(serverNumber).getMaster(); 306 } 307 308 /** 309 * Gets the current active master, if available. If no active master, returns null. 310 * @return the HMaster for the active master 311 */ 312 public HMaster getActiveMaster() { 313 for (JVMClusterUtil.MasterThread mt : masterThreads) { 314 // Ensure that the current active master is not stopped. 315 // We don't want to return a stopping master as an active master. 316 if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { 317 return mt.getMaster(); 318 } 319 } 320 return null; 321 } 322 323 /** Returns Read-only list of master threads. */ 324 public List<JVMClusterUtil.MasterThread> getMasters() { 325 return Collections.unmodifiableList(this.masterThreads); 326 } 327 328 /** 329 * @return List of running master servers (Some servers may have been killed or aborted during 330 * lifetime of cluster; these servers are not included in this list). 331 */ 332 public List<JVMClusterUtil.MasterThread> getLiveMasters() { 333 List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>(); 334 List<JVMClusterUtil.MasterThread> list = getMasters(); 335 for (JVMClusterUtil.MasterThread mt : list) { 336 if (mt.isAlive()) { 337 liveServers.add(mt); 338 } 339 } 340 return liveServers; 341 } 342 343 /** 344 * Wait for the specified master to stop. Removes this thread from list of running threads. 345 * @return Name of master that just went down. 346 */ 347 public String waitOnMaster(int serverNumber) { 348 JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber); 349 return waitOnMaster(masterThread); 350 } 351 352 /** 353 * Wait for the specified master to stop. Removes this thread from list of running threads. 354 * @return Name of master that just went down. 355 */ 356 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) { 357 boolean interrupted = false; 358 while (masterThread.isAlive()) { 359 try { 360 LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString()); 361 masterThread.join(); 362 } catch (InterruptedException e) { 363 LOG.error("Interrupted while waiting for {} to finish. Retrying join", 364 masterThread.getName(), e); 365 interrupted = true; 366 } 367 } 368 masterThreads.remove(masterThread); 369 if (interrupted) { 370 Thread.currentThread().interrupt(); 371 } 372 return masterThread.getName(); 373 } 374 375 /** 376 * Wait for Mini HBase Cluster to shut down. Presumes you've already called {@link #shutdown()}. 377 */ 378 public void join() { 379 if (this.regionThreads != null) { 380 for (Thread t : this.regionThreads) { 381 if (t.isAlive()) { 382 try { 383 Threads.threadDumpingIsAlive(t); 384 } catch (InterruptedException e) { 385 LOG.debug("Interrupted", e); 386 } 387 } 388 } 389 } 390 if (this.masterThreads != null) { 391 for (Thread t : this.masterThreads) { 392 if (t.isAlive()) { 393 try { 394 Threads.threadDumpingIsAlive(t); 395 } catch (InterruptedException e) { 396 LOG.debug("Interrupted", e); 397 } 398 } 399 } 400 } 401 } 402 403 /** 404 * Start the cluster. 405 */ 406 public void startup() throws IOException { 407 JVMClusterUtil.startup(this.masterThreads, this.regionThreads); 408 } 409 410 /** 411 * Shut down the mini HBase cluster 412 */ 413 public void shutdown() { 414 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads); 415 } 416 417 /** 418 * @param c Configuration to check. 419 * @return True if a 'local' address in hbase.master value. 420 */ 421 public static boolean isLocal(final Configuration c) { 422 boolean mode = 423 c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); 424 return (mode == HConstants.CLUSTER_IS_LOCAL); 425 } 426 427 /** 428 * Test things basically work. 429 */ 430 public static void main(String[] args) throws IOException { 431 Configuration conf = HBaseConfiguration.create(); 432 LocalHBaseCluster cluster = new LocalHBaseCluster(conf); 433 cluster.startup(); 434 Connection connection = ConnectionFactory.createConnection(conf); 435 Admin admin = connection.getAdmin(); 436 try { 437 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cluster.getClass().getName())); 438 admin.createTable(htd); 439 } finally { 440 admin.close(); 441 } 442 connection.close(); 443 cluster.shutdown(); 444 } 445}