001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.client.Admin;
023import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.ConnectionFactory;
026import org.apache.hadoop.hbase.client.TableDescriptor;
027import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
028import org.apache.hadoop.hbase.util.RegionSplitter;
029import org.apache.hadoop.util.ReflectionUtils;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Facility for <strong>integration/system</strong> tests. This extends {@link HBaseTestingUtil} and
035 * adds-in the functionality needed by integration and system tests. This class understands
036 * distributed and pseudo-distributed/local cluster deployments, and abstracts those from the tests
037 * in this module.
038 * <p>
039 * IntegrationTestingUtility is constructed and used by the integration tests, but the tests
040 * themselves should not assume a particular deployment. They can rely on the methods in this class
041 * and HBaseCluster. Before the testing begins, the test should initialize the cluster by calling
042 * {@link #initializeCluster(int)}.
043 * <p>
044 * The cluster that is used defaults to a mini cluster, but it can be forced to use a distributed
045 * cluster by calling {@link #setUseDistributedCluster(Configuration)}. This method is invoked by
046 * test drivers (maven, IntegrationTestsDriver, etc) before initializing the cluster via
047 * {@link #initializeCluster(int)}. Individual tests should not directly call
048 * {@link #setUseDistributedCluster(Configuration)}.
049 */
050public class IntegrationTestingUtility extends HBaseTestingUtil {
051  private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestingUtility.class);
052
053  public IntegrationTestingUtility() {
054    this(HBaseConfiguration.create());
055  }
056
057  public IntegrationTestingUtility(Configuration conf) {
058    super(conf);
059  }
060
061  /**
062   * Configuration that controls whether this utility assumes a running/deployed cluster. This is
063   * different than "hbase.cluster.distributed" since that parameter indicates whether the cluster
064   * is in an actual distributed environment, while this shows that there is a deployed (distributed
065   * or pseudo-distributed) cluster running, and we do not need to start a mini-cluster for tests.
066   */
067  public static final String IS_DISTRIBUTED_CLUSTER = "hbase.test.cluster.distributed";
068
069  /**
070   * Config for pluggable hbase cluster manager. Pass fully-qualified class name as property value.
071   * Drop the '.class' suffix.
072   */
073  public static final String HBASE_CLUSTER_MANAGER_CLASS = "hbase.it.clustermanager.class";
074  private static final Class<? extends ClusterManager> DEFAULT_HBASE_CLUSTER_MANAGER_CLASS =
075    HBaseClusterManager.class;
076
077  public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
078  /**
079   * The default number of regions per regionserver when creating a pre-split table.
080   */
081  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
082
083  public static final String PRESPLIT_TEST_TABLE_KEY = "hbase.test.pre-split-table";
084  public static final boolean PRESPLIT_TEST_TABLE = true;
085
086  /**
087   * Initializes the state of the cluster. It starts a new in-process mini cluster, OR if we are
088   * given an already deployed distributed cluster it initializes the state.
089   * @param numSlaves Number of slaves to start up if we are booting a mini cluster. Otherwise we
090   *                  check whether this many nodes are available and throw an exception if not.
091   */
092  public void initializeCluster(int numSlaves) throws Exception {
093    if (isDistributedCluster()) {
094      createDistributedHBaseCluster();
095      checkNodeCount(numSlaves);
096    } else {
097      startMiniCluster(numSlaves);
098    }
099  }
100
101  /**
102   * Checks whether we have more than numSlaves nodes. Throws an exception otherwise.
103   */
104  public void checkNodeCount(int numSlaves) throws Exception {
105    HBaseClusterInterface cluster = getHBaseClusterInterface();
106    if (cluster.getClusterMetrics().getLiveServerMetrics().size() < numSlaves) {
107      throw new Exception("Cluster does not have enough nodes:" + numSlaves);
108    }
109  }
110
111  /**
112   * Restores the cluster to the initial state if it is a distributed cluster, otherwise, shutdowns
113   * the mini cluster.
114   */
115  public void restoreCluster() throws IOException {
116    if (isDistributedCluster()) {
117      getHBaseClusterInterface().restoreInitialStatus();
118    } else {
119      try {
120        shutdownMiniCluster();
121      } catch (Exception e) {
122        // re-wrap into IOException
123        throw new IOException(e);
124      }
125    }
126  }
127
128  /**
129   * Sets the configuration property to use a distributed cluster for the integration tests. Test
130   * drivers should use this to enforce cluster deployment.
131   */
132  public static void setUseDistributedCluster(Configuration conf) {
133    conf.setBoolean(IS_DISTRIBUTED_CLUSTER, true);
134    System.setProperty(IS_DISTRIBUTED_CLUSTER, "true");
135  }
136
137  /**
138   * Returns whether we are interacting with a distributed cluster as opposed to and in-process mini
139   * cluster or a local cluster.
140   * @see IntegrationTestingUtility#setUseDistributedCluster(Configuration)
141   */
142  public boolean isDistributedCluster() {
143    Configuration conf = getConfiguration();
144    boolean isDistributedCluster =
145      Boolean.parseBoolean(System.getProperty(IS_DISTRIBUTED_CLUSTER, "false"));
146    if (!isDistributedCluster) {
147      isDistributedCluster = conf.getBoolean(IS_DISTRIBUTED_CLUSTER, false);
148    }
149    return isDistributedCluster;
150  }
151
152  public void createDistributedHBaseCluster() throws IOException {
153    // if it is a distributed HBase cluster, use the conf provided by classpath
154    // to set hbase dir and fs.defaultFS.
155    // Since when the super class HBaseTestingUtility initializing, it will
156    // change hbase.rootdir to a local test dir.
157    // we use "original.defaultFS" and "original.hbase.dir" to restore them.
158    Configuration conf = getConfiguration();
159    if (conf.get("original.defaultFS") != null) {
160      conf.set("fs.defaultFS", conf.get("original.defaultFS"));
161    }
162    if (conf.get("original.hbase.dir") != null) {
163      conf.set(HConstants.HBASE_DIR, conf.get("original.hbase.dir"));
164    }
165    LOG.debug("Setting {} to {} since it is a distributed cluster", HConstants.HBASE_DIR,
166      conf.get(HConstants.HBASE_DIR));
167    Class<? extends ClusterManager> clusterManagerClass = conf.getClass(HBASE_CLUSTER_MANAGER_CLASS,
168      DEFAULT_HBASE_CLUSTER_MANAGER_CLASS, ClusterManager.class);
169    LOG.info("Instantiating {}", clusterManagerClass.getName());
170    ClusterManager clusterManager = ReflectionUtils.newInstance(clusterManagerClass, conf);
171    setHBaseCluster(new DistributedHBaseCluster(conf, clusterManager));
172    getAdmin();
173  }
174
175  /**
176   * Creates a pre-split table for load testing. If the table already exists, logs a warning and
177   * continues.
178   * @return the number of regions the table was split into
179   */
180  public static int createPreSplitLoadTestTable(Configuration conf, TableDescriptor td,
181    ColumnFamilyDescriptor[] cds, int numRegionsPerServer) throws IOException {
182    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(td);
183    for (ColumnFamilyDescriptor cd : cds) {
184      if (!td.hasColumnFamily(cd.getName())) {
185        builder.setColumnFamily(cd);
186      }
187    }
188    td = builder.build();
189    int totalNumberOfRegions = 0;
190    Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
191    Admin admin = unmanagedConnection.getAdmin();
192
193    try {
194      // create a table a pre-splits regions.
195      // The number of splits is set as:
196      // region servers * regions per region server).
197      int numberOfServers = admin.getRegionServers().size();
198      if (numberOfServers == 0) {
199        throw new IllegalStateException("No live regionservers");
200      }
201
202      totalNumberOfRegions = numberOfServers * numRegionsPerServer;
203      LOG.info("Number of live regionservers: " + numberOfServers + ", "
204        + "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: "
205        + numRegionsPerServer + ")");
206
207      byte[][] splits = new RegionSplitter.HexStringSplit().split(totalNumberOfRegions);
208
209      admin.createTable(td, splits);
210    } catch (MasterNotRunningException e) {
211      LOG.error("Master not running", e);
212      throw new IOException(e);
213    } catch (TableExistsException e) {
214      LOG.warn("Table " + td.getTableName() + " already exists, continuing");
215    } finally {
216      admin.close();
217      unmanagedConnection.close();
218    }
219    return totalNumberOfRegions;
220  }
221}