001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedExceptionAction;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.CopyOnWriteArrayList;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.client.Admin;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.master.HMaster;
031import org.apache.hadoop.hbase.regionserver.HRegionServer;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.util.JVMClusterUtil;
034import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
035import org.apache.hadoop.hbase.util.Threads;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * This class creates a single process HBase cluster. One thread is created for a master and one per
042 * region server. Call {@link #startup()} to start the cluster running and {@link #shutdown()} to
043 * close it all down. {@link #join} the cluster is you want to wait on shutdown completion.
044 * <p>
045 * Runs master on port 16000 by default. Because we can't just kill the process -- not till
046 * HADOOP-1700 gets fixed and even then.... -- we need to be able to find the master with a remote
047 * client to run shutdown. To use a port other than 16000, set the hbase.master to a value of
048 * 'local:PORT': that is 'local', not 'localhost', and the port number the master should use instead
049 * of 16000.
050 */
051@InterfaceAudience.Public
052public class LocalHBaseCluster {
053  private static final Logger LOG = LoggerFactory.getLogger(LocalHBaseCluster.class);
054  private final List<JVMClusterUtil.MasterThread> masterThreads = new CopyOnWriteArrayList<>();
055  private final List<JVMClusterUtil.RegionServerThread> regionThreads =
056    new CopyOnWriteArrayList<>();
057  private final static int DEFAULT_NO = 1;
058  /** local mode */
059  public static final String LOCAL = "local";
060  /** 'local:' */
061  public static final String LOCAL_COLON = LOCAL + ":";
062  public static final String ASSIGN_RANDOM_PORTS = "hbase.localcluster.assign.random.ports";
063
064  private final Configuration conf;
065  private final Class<? extends HMaster> masterClass;
066  private final Class<? extends HRegionServer> regionServerClass;
067
068  /**
069   * Constructor.
070   */
071  public LocalHBaseCluster(final Configuration conf) throws IOException {
072    this(conf, DEFAULT_NO);
073  }
074
075  /**
076   * Constructor.
077   * @param conf            Configuration to use. Post construction has the master's address.
078   * @param noRegionServers Count of regionservers to start.
079   */
080  public LocalHBaseCluster(final Configuration conf, final int noRegionServers) throws IOException {
081    this(conf, 1, 0, noRegionServers, getMasterImplementation(conf),
082      getRegionServerImplementation(conf));
083  }
084
085  /**
086   * Constructor.
087   * @param conf            Configuration to use. Post construction has the active master address.
088   * @param noMasters       Count of masters to start.
089   * @param noRegionServers Count of regionservers to start.
090   */
091  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers)
092    throws IOException {
093    this(conf, noMasters, 0, noRegionServers, getMasterImplementation(conf),
094      getRegionServerImplementation(conf));
095  }
096
097  @SuppressWarnings("unchecked")
098  private static Class<? extends HRegionServer>
099    getRegionServerImplementation(final Configuration conf) {
100    return (Class<? extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL,
101      HRegionServer.class);
102  }
103
104  @SuppressWarnings("unchecked")
105  private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
106    return (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, HMaster.class);
107  }
108
109  public LocalHBaseCluster(final Configuration conf, final int noMasters, final int noRegionServers,
110    final Class<? extends HMaster> masterClass,
111    final Class<? extends HRegionServer> regionServerClass) throws IOException {
112    this(conf, noMasters, 0, noRegionServers, masterClass, regionServerClass);
113  }
114
115  /**
116   * Constructor.
117   * @param conf            Configuration to use. Post construction has the master's address.
118   * @param noMasters       Count of masters to start.
119   * @param noRegionServers Count of regionservers to start.
120   */
121  @SuppressWarnings("unchecked")
122  public LocalHBaseCluster(final Configuration conf, final int noMasters,
123    final int noAlwaysStandByMasters, final int noRegionServers,
124    final Class<? extends HMaster> masterClass,
125    final Class<? extends HRegionServer> regionServerClass) throws IOException {
126    this.conf = conf;
127
128    // When active, if a port selection is default then we switch to random
129    if (conf.getBoolean(ASSIGN_RANDOM_PORTS, false)) {
130      if (
131        conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT)
132            == HConstants.DEFAULT_MASTER_PORT
133      ) {
134        LOG.debug("Setting Master Port to random.");
135        conf.set(HConstants.MASTER_PORT, "0");
136      }
137      if (
138        conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT)
139            == HConstants.DEFAULT_REGIONSERVER_PORT
140      ) {
141        LOG.debug("Setting RegionServer Port to random.");
142        conf.set(HConstants.REGIONSERVER_PORT, "0");
143      }
144      // treat info ports special; expressly don't change '-1' (keep off)
145      // in case we make that the default behavior.
146      if (
147        conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 0) != -1
148          && conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
149            HConstants.DEFAULT_REGIONSERVER_INFOPORT) == HConstants.DEFAULT_REGIONSERVER_INFOPORT
150      ) {
151        LOG.debug("Setting RS InfoServer Port to random.");
152        conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
153      }
154      if (
155        conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1
156          && conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
157              == HConstants.DEFAULT_MASTER_INFOPORT
158      ) {
159        LOG.debug("Setting Master InfoServer Port to random.");
160        conf.set(HConstants.MASTER_INFO_PORT, "0");
161      }
162    }
163
164    this.masterClass =
165      (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, masterClass);
166    // Start the HMasters.
167    int i;
168    for (i = 0; i < noMasters; i++) {
169      addMaster(new Configuration(conf), i);
170    }
171    for (int j = 0; j < noAlwaysStandByMasters; j++) {
172      Configuration c = new Configuration(conf);
173      c.set(HConstants.MASTER_IMPL, "org.apache.hadoop.hbase.master.AlwaysStandByHMaster");
174      addMaster(c, i + j);
175    }
176    // Start the HRegionServers.
177    this.regionServerClass = (Class<? extends HRegionServer>) conf
178      .getClass(HConstants.REGION_SERVER_IMPL, regionServerClass);
179
180    for (int j = 0; j < noRegionServers; j++) {
181      addRegionServer(new Configuration(conf), j);
182    }
183  }
184
185  public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException {
186    return addRegionServer(new Configuration(conf), this.regionThreads.size());
187  }
188
189  @SuppressWarnings("unchecked")
190  public JVMClusterUtil.RegionServerThread addRegionServer(Configuration config, final int index)
191    throws IOException {
192    // Create each regionserver with its own Configuration instance so each has
193    // its Connection instance rather than share (see HBASE_INSTANCES down in
194    // the guts of ConnectionManager).
195    JVMClusterUtil.RegionServerThread rst =
196      JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
197        .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
198
199    this.regionThreads.add(rst);
200    return rst;
201  }
202
203  public JVMClusterUtil.RegionServerThread addRegionServer(final Configuration config,
204    final int index, User user) throws IOException, InterruptedException {
205    return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
206      @Override
207      public JVMClusterUtil.RegionServerThread run() throws Exception {
208        return addRegionServer(config, index);
209      }
210    });
211  }
212
213  public JVMClusterUtil.MasterThread addMaster() throws IOException {
214    return addMaster(new Configuration(conf), this.masterThreads.size());
215  }
216
217  public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
218    throws IOException {
219    // Create each master with its own Configuration instance so each has
220    // its Connection instance rather than share (see HBASE_INSTANCES down in
221    // the guts of ConnectionManager.
222    JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
223      (Class<? extends HMaster>) c.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
224    this.masterThreads.add(mt);
225    // Refresh the master address config.
226    List<String> masterHostPorts = new ArrayList<>();
227    getMasters().forEach(masterThread -> masterHostPorts
228      .add(masterThread.getMaster().getServerName().getAddress().toString()));
229    conf.set(HConstants.MASTER_ADDRS_KEY, String.join(",", masterHostPorts));
230    return mt;
231  }
232
233  public JVMClusterUtil.MasterThread addMaster(final Configuration c, final int index, User user)
234    throws IOException, InterruptedException {
235    return user.runAs(new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
236      @Override
237      public JVMClusterUtil.MasterThread run() throws Exception {
238        return addMaster(c, index);
239      }
240    });
241  }
242
243  /** Returns region server */
244  public HRegionServer getRegionServer(int serverNumber) {
245    return regionThreads.get(serverNumber).getRegionServer();
246  }
247
248  /** Returns Read-only list of region server threads. */
249  public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
250    return Collections.unmodifiableList(this.regionThreads);
251  }
252
253  /**
254   * @return List of running servers (Some servers may have been killed or aborted during lifetime
255   *         of cluster; these servers are not included in this list).
256   */
257  public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
258    List<JVMClusterUtil.RegionServerThread> liveServers = new ArrayList<>();
259    List<RegionServerThread> list = getRegionServers();
260    for (JVMClusterUtil.RegionServerThread rst : list) {
261      if (rst.isAlive()) liveServers.add(rst);
262      else LOG.info("Not alive " + rst.getName());
263    }
264    return liveServers;
265  }
266
267  /** Returns the Configuration used by this LocalHBaseCluster */
268  public Configuration getConfiguration() {
269    return this.conf;
270  }
271
272  /**
273   * Wait for the specified region server to stop. Removes this thread from list of running threads.
274   * @return Name of region server that just went down.
275   */
276  public String waitOnRegionServer(int serverNumber) {
277    JVMClusterUtil.RegionServerThread regionServerThread = this.regionThreads.get(serverNumber);
278    return waitOnRegionServer(regionServerThread);
279  }
280
281  /**
282   * Wait for the specified region server to stop. Removes this thread from list of running threads.
283   * @return Name of region server that just went down.
284   */
285  public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
286    boolean interrupted = false;
287    while (rst.isAlive()) {
288      try {
289        LOG.info("Waiting on " + rst.getRegionServer().toString());
290        rst.join();
291      } catch (InterruptedException e) {
292        LOG.error("Interrupted while waiting for {} to finish. Retrying join", rst.getName(), e);
293        interrupted = true;
294      }
295    }
296    regionThreads.remove(rst);
297    if (interrupted) {
298      Thread.currentThread().interrupt();
299    }
300    return rst.getName();
301  }
302
303  /** Returns the HMaster thread */
304  public HMaster getMaster(int serverNumber) {
305    return masterThreads.get(serverNumber).getMaster();
306  }
307
308  /**
309   * Gets the current active master, if available. If no active master, returns null.
310   * @return the HMaster for the active master
311   */
312  public HMaster getActiveMaster() {
313    for (JVMClusterUtil.MasterThread mt : masterThreads) {
314      // Ensure that the current active master is not stopped.
315      // We don't want to return a stopping master as an active master.
316      if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
317        return mt.getMaster();
318      }
319    }
320    return null;
321  }
322
323  /** Returns Read-only list of master threads. */
324  public List<JVMClusterUtil.MasterThread> getMasters() {
325    return Collections.unmodifiableList(this.masterThreads);
326  }
327
328  /**
329   * @return List of running master servers (Some servers may have been killed or aborted during
330   *         lifetime of cluster; these servers are not included in this list).
331   */
332  public List<JVMClusterUtil.MasterThread> getLiveMasters() {
333    List<JVMClusterUtil.MasterThread> liveServers = new ArrayList<>();
334    List<JVMClusterUtil.MasterThread> list = getMasters();
335    for (JVMClusterUtil.MasterThread mt : list) {
336      if (mt.isAlive()) {
337        liveServers.add(mt);
338      }
339    }
340    return liveServers;
341  }
342
343  /**
344   * Wait for the specified master to stop. Removes this thread from list of running threads.
345   * @return Name of master that just went down.
346   */
347  public String waitOnMaster(int serverNumber) {
348    JVMClusterUtil.MasterThread masterThread = this.masterThreads.get(serverNumber);
349    return waitOnMaster(masterThread);
350  }
351
352  /**
353   * Wait for the specified master to stop. Removes this thread from list of running threads.
354   * @return Name of master that just went down.
355   */
356  public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
357    boolean interrupted = false;
358    while (masterThread.isAlive()) {
359      try {
360        LOG.info("Waiting on " + masterThread.getMaster().getServerName().toString());
361        masterThread.join();
362      } catch (InterruptedException e) {
363        LOG.error("Interrupted while waiting for {} to finish. Retrying join",
364          masterThread.getName(), e);
365        interrupted = true;
366      }
367    }
368    masterThreads.remove(masterThread);
369    if (interrupted) {
370      Thread.currentThread().interrupt();
371    }
372    return masterThread.getName();
373  }
374
375  /**
376   * Wait for Mini HBase Cluster to shut down. Presumes you've already called {@link #shutdown()}.
377   */
378  public void join() {
379    if (this.regionThreads != null) {
380      for (Thread t : this.regionThreads) {
381        if (t.isAlive()) {
382          try {
383            Threads.threadDumpingIsAlive(t);
384          } catch (InterruptedException e) {
385            LOG.debug("Interrupted", e);
386          }
387        }
388      }
389    }
390    if (this.masterThreads != null) {
391      for (Thread t : this.masterThreads) {
392        if (t.isAlive()) {
393          try {
394            Threads.threadDumpingIsAlive(t);
395          } catch (InterruptedException e) {
396            LOG.debug("Interrupted", e);
397          }
398        }
399      }
400    }
401  }
402
403  /**
404   * Start the cluster.
405   */
406  public void startup() throws IOException {
407    JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
408  }
409
410  /**
411   * Shut down the mini HBase cluster
412   */
413  public void shutdown() {
414    JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
415  }
416
417  /**
418   * @param c Configuration to check.
419   * @return True if a 'local' address in hbase.master value.
420   */
421  public static boolean isLocal(final Configuration c) {
422    boolean mode =
423      c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
424    return (mode == HConstants.CLUSTER_IS_LOCAL);
425  }
426
427  /**
428   * Test things basically work.
429   */
430  public static void main(String[] args) throws IOException {
431    Configuration conf = HBaseConfiguration.create();
432    LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
433    cluster.startup();
434    Connection connection = ConnectionFactory.createConnection(conf);
435    Admin admin = connection.getAdmin();
436    try {
437      HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
438      admin.createTable(htd);
439    } finally {
440      admin.close();
441    }
442    connection.close();
443    cluster.shutdown();
444  }
445}