001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import edu.umd.cs.findbugs.annotations.Nullable;
025import java.io.File;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.io.UncheckedIOException;
029import java.lang.reflect.Field;
030import java.lang.reflect.Modifier;
031import java.net.BindException;
032import java.net.DatagramSocket;
033import java.net.InetAddress;
034import java.net.ServerSocket;
035import java.net.Socket;
036import java.net.UnknownHostException;
037import java.nio.charset.StandardCharsets;
038import java.security.MessageDigest;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.Iterator;
045import java.util.List;
046import java.util.Map;
047import java.util.NavigableSet;
048import java.util.Properties;
049import java.util.Random;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.concurrent.ExecutionException;
053import java.util.concurrent.ThreadLocalRandom;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicReference;
056import java.util.function.BooleanSupplier;
057import org.apache.commons.io.FileUtils;
058import org.apache.commons.lang3.RandomStringUtils;
059import org.apache.hadoop.conf.Configuration;
060import org.apache.hadoop.fs.FileSystem;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
063import org.apache.hadoop.hbase.Waiter.Predicate;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncAdmin;
066import org.apache.hadoop.hbase.client.AsyncClusterConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.Consistency;
074import org.apache.hadoop.hbase.client.Delete;
075import org.apache.hadoop.hbase.client.Durability;
076import org.apache.hadoop.hbase.client.Get;
077import org.apache.hadoop.hbase.client.Hbck;
078import org.apache.hadoop.hbase.client.MasterRegistry;
079import org.apache.hadoop.hbase.client.Put;
080import org.apache.hadoop.hbase.client.RegionInfo;
081import org.apache.hadoop.hbase.client.RegionInfoBuilder;
082import org.apache.hadoop.hbase.client.RegionLocator;
083import org.apache.hadoop.hbase.client.Result;
084import org.apache.hadoop.hbase.client.ResultScanner;
085import org.apache.hadoop.hbase.client.Scan;
086import org.apache.hadoop.hbase.client.Scan.ReadType;
087import org.apache.hadoop.hbase.client.Table;
088import org.apache.hadoop.hbase.client.TableDescriptor;
089import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
090import org.apache.hadoop.hbase.client.TableState;
091import org.apache.hadoop.hbase.fs.HFileSystem;
092import org.apache.hadoop.hbase.io.compress.Compression;
093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
095import org.apache.hadoop.hbase.io.hfile.BlockCache;
096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.ipc.RpcServerInterface;
099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
100import org.apache.hadoop.hbase.master.HMaster;
101import org.apache.hadoop.hbase.master.RegionState;
102import org.apache.hadoop.hbase.master.ServerManager;
103import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
105import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
106import org.apache.hadoop.hbase.master.assignment.RegionStates;
107import org.apache.hadoop.hbase.mob.MobFileCache;
108import org.apache.hadoop.hbase.regionserver.BloomType;
109import org.apache.hadoop.hbase.regionserver.ChunkCreator;
110import org.apache.hadoop.hbase.regionserver.HRegion;
111import org.apache.hadoop.hbase.regionserver.HRegionServer;
112import org.apache.hadoop.hbase.regionserver.HStore;
113import org.apache.hadoop.hbase.regionserver.InternalScanner;
114import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
115import org.apache.hadoop.hbase.regionserver.Region;
116import org.apache.hadoop.hbase.regionserver.RegionScanner;
117import org.apache.hadoop.hbase.regionserver.RegionServerServices;
118import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
119import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
120import org.apache.hadoop.hbase.security.User;
121import org.apache.hadoop.hbase.security.UserProvider;
122import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
123import org.apache.hadoop.hbase.util.Bytes;
124import org.apache.hadoop.hbase.util.CommonFSUtils;
125import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
126import org.apache.hadoop.hbase.util.FSUtils;
127import org.apache.hadoop.hbase.util.JVM;
128import org.apache.hadoop.hbase.util.JVMClusterUtil;
129import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
130import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
131import org.apache.hadoop.hbase.util.Pair;
132import org.apache.hadoop.hbase.util.ReflectionUtils;
133import org.apache.hadoop.hbase.util.RetryCounter;
134import org.apache.hadoop.hbase.util.Threads;
135import org.apache.hadoop.hbase.wal.WAL;
136import org.apache.hadoop.hbase.wal.WALFactory;
137import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
138import org.apache.hadoop.hbase.zookeeper.ZKConfig;
139import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
140import org.apache.hadoop.hdfs.DFSClient;
141import org.apache.hadoop.hdfs.DistributedFileSystem;
142import org.apache.hadoop.hdfs.MiniDFSCluster;
143import org.apache.hadoop.hdfs.server.datanode.DataNode;
144import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
145import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
146import org.apache.hadoop.mapred.JobConf;
147import org.apache.hadoop.mapred.MiniMRCluster;
148import org.apache.hadoop.mapred.TaskLog;
149import org.apache.hadoop.minikdc.MiniKdc;
150import org.apache.yetus.audience.InterfaceAudience;
151import org.apache.yetus.audience.InterfaceStability;
152import org.apache.zookeeper.WatchedEvent;
153import org.apache.zookeeper.ZooKeeper;
154import org.apache.zookeeper.ZooKeeper.States;
155
156import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
157
158import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
159
160/**
161 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase
162 * functionality. Create an instance and keep it around testing HBase.
163 * <p/>
164 * This class is meant to be your one-stop shop for anything you might need testing. Manages one
165 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster},
166 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real
167 * cluster.
168 * <p/>
169 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration.
170 * <p/>
171 * It does not set logging levels.
172 * <p/>
173 * In the configuration properties, default values for master-info-port and region-server-port are
174 * overridden such that a random port will be assigned (thus avoiding port contention if another
175 * local HBase instance is already running).
176 * <p/>
177 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
178 * setting it to true.
179 */
180@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX)
181@InterfaceStability.Evolving
182public class HBaseTestingUtil extends HBaseZKTestingUtil {
183
184  public static final int DEFAULT_REGIONS_PER_SERVER = 3;
185
186  private MiniDFSCluster dfsCluster = null;
187  private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null;
188
189  private volatile HBaseClusterInterface hbaseCluster = null;
190  private MiniMRCluster mrCluster = null;
191
192  /** If there is a mini cluster running for this testing utility instance. */
193  private volatile boolean miniClusterRunning;
194
195  private String hadoopLogDir;
196
197  /**
198   * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility
199   */
200  private Path dataTestDirOnTestFS = null;
201
202  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
203
204  /** Filesystem URI used for map-reduce mini-cluster setup */
205  private static String FS_URI;
206
207  /** This is for unit tests parameterized with a single boolean. */
208  public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination();
209
210  /**
211   * Checks to see if a specific port is available.
212   * @param port the port number to check for availability
213   * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
214   */
215  public static boolean available(int port) {
216    ServerSocket ss = null;
217    DatagramSocket ds = null;
218    try {
219      ss = new ServerSocket(port);
220      ss.setReuseAddress(true);
221      ds = new DatagramSocket(port);
222      ds.setReuseAddress(true);
223      return true;
224    } catch (IOException e) {
225      // Do nothing
226    } finally {
227      if (ds != null) {
228        ds.close();
229      }
230
231      if (ss != null) {
232        try {
233          ss.close();
234        } catch (IOException e) {
235          /* should not be thrown */
236        }
237      }
238    }
239
240    return false;
241  }
242
243  /**
244   * Create all combinations of Bloom filters and compression algorithms for testing.
245   */
246  private static List<Object[]> bloomAndCompressionCombinations() {
247    List<Object[]> configurations = new ArrayList<>();
248    for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) {
249      for (BloomType bloomType : BloomType.values()) {
250        configurations.add(new Object[] { comprAlgo, bloomType });
251      }
252    }
253    return Collections.unmodifiableList(configurations);
254  }
255
256  /**
257   * Create combination of memstoreTS and tags
258   */
259  private static List<Object[]> memStoreTSAndTagsCombination() {
260    List<Object[]> configurations = new ArrayList<>();
261    configurations.add(new Object[] { false, false });
262    configurations.add(new Object[] { false, true });
263    configurations.add(new Object[] { true, false });
264    configurations.add(new Object[] { true, true });
265    return Collections.unmodifiableList(configurations);
266  }
267
268  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
269    List<Object[]> configurations = new ArrayList<>();
270    configurations.add(new Object[] { false, false, true });
271    configurations.add(new Object[] { false, false, false });
272    configurations.add(new Object[] { false, true, true });
273    configurations.add(new Object[] { false, true, false });
274    configurations.add(new Object[] { true, false, true });
275    configurations.add(new Object[] { true, false, false });
276    configurations.add(new Object[] { true, true, true });
277    configurations.add(new Object[] { true, true, false });
278    return Collections.unmodifiableList(configurations);
279  }
280
281  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
282    bloomAndCompressionCombinations();
283
284  /**
285   * <p>
286   * Create an HBaseTestingUtility using a default configuration.
287   * <p>
288   * Initially, all tmp files are written to a local test data directory. Once
289   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
290   * data will be written to the DFS directory instead.
291   */
292  public HBaseTestingUtil() {
293    this(HBaseConfiguration.create());
294  }
295
296  /**
297   * <p>
298   * Create an HBaseTestingUtility using a given configuration.
299   * <p>
300   * Initially, all tmp files are written to a local test data directory. Once
301   * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp
302   * data will be written to the DFS directory instead.
303   * @param conf The configuration to use for further operations
304   */
305  public HBaseTestingUtil(@Nullable Configuration conf) {
306    super(conf);
307
308    // a hbase checksum verification failure will cause unit tests to fail
309    ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
310
311    // Save this for when setting default file:// breaks things
312    if (this.conf.get("fs.defaultFS") != null) {
313      this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS"));
314    }
315    if (this.conf.get(HConstants.HBASE_DIR) != null) {
316      this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR));
317    }
318    // Every cluster is a local cluster until we start DFS
319    // Note that conf could be null, but this.conf will not be
320    String dataTestDir = getDataTestDir().toString();
321    this.conf.set("fs.defaultFS", "file:///");
322    this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir);
323    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
324    this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
325    // If the value for random ports isn't set set it to true, thus making
326    // tests opt-out for random port assignment
327    this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS,
328      this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true));
329  }
330
331  /**
332   * Close both the region {@code r} and it's underlying WAL. For use in tests.
333   */
334  public static void closeRegionAndWAL(final Region r) throws IOException {
335    closeRegionAndWAL((HRegion) r);
336  }
337
338  /**
339   * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
340   */
341  public static void closeRegionAndWAL(final HRegion r) throws IOException {
342    if (r == null) return;
343    r.close();
344    if (r.getWAL() == null) return;
345    r.getWAL().close();
346  }
347
348  /**
349   * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
350   * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed
351   * by the Configuration. If say, a Connection was being used against a cluster that had been
352   * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome.
353   * Rather than use the return direct, its usually best to make a copy and use that. Do
354   * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
355   * @return Instance of Configuration.
356   */
357  @Override
358  public Configuration getConfiguration() {
359    return super.getConfiguration();
360  }
361
362  public void setHBaseCluster(HBaseClusterInterface hbaseCluster) {
363    this.hbaseCluster = hbaseCluster;
364  }
365
366  /**
367   * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can
368   * have many concurrent tests running if we need to. Moding a System property is not the way to do
369   * concurrent instances -- another instance could grab the temporary value unintentionally -- but
370   * not anything can do about it at moment; single instance only is how the minidfscluster works.
371   * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir
372   * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir
373   * (We do not create them!).
374   * @return The calculated data test build directory, if newly-created.
375   */
376  @Override
377  protected Path setupDataTestDir() {
378    Path testPath = super.setupDataTestDir();
379    if (null == testPath) {
380      return null;
381    }
382
383    createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir");
384
385    // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
386    // we want our own value to ensure uniqueness on the same machine
387    createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir");
388
389    // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
390    createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir");
391    return testPath;
392  }
393
394  private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) {
395
396    String sysValue = System.getProperty(propertyName);
397
398    if (sysValue != null) {
399      // There is already a value set. So we do nothing but hope
400      // that there will be no conflicts
401      LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue
402        + " so I do NOT create it in " + parent);
403      String confValue = conf.get(propertyName);
404      if (confValue != null && !confValue.endsWith(sysValue)) {
405        LOG.warn(propertyName + " property value differs in configuration and system: "
406          + "Configuration=" + confValue + " while System=" + sysValue
407          + " Erasing configuration value by system value.");
408      }
409      conf.set(propertyName, sysValue);
410    } else {
411      // Ok, it's not set, so we create it as a subdirectory
412      createSubDir(propertyName, parent, subDirName);
413      System.setProperty(propertyName, conf.get(propertyName));
414    }
415  }
416
417  /**
418   * @return Where to write test data on the test filesystem; Returns working directory for the test
419   *         filesystem by default
420   * @see #setupDataTestDirOnTestFS()
421   * @see #getTestFileSystem()
422   */
423  private Path getBaseTestDirOnTestFS() throws IOException {
424    FileSystem fs = getTestFileSystem();
425    return new Path(fs.getWorkingDirectory(), "test-data");
426  }
427
428  /**
429   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
430   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
431   * on it.
432   * @return a unique path in the test filesystem
433   */
434  public Path getDataTestDirOnTestFS() throws IOException {
435    if (dataTestDirOnTestFS == null) {
436      setupDataTestDirOnTestFS();
437    }
438
439    return dataTestDirOnTestFS;
440  }
441
442  /**
443   * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write
444   * temporary test data. Call this method after setting up the mini dfs cluster if the test relies
445   * on it.
446   * @return a unique path in the test filesystem
447   * @param subdirName name of the subdir to create under the base test dir
448   */
449  public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
450    return new Path(getDataTestDirOnTestFS(), subdirName);
451  }
452
453  /**
454   * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already
455   * setup.
456   */
457  private void setupDataTestDirOnTestFS() throws IOException {
458    if (dataTestDirOnTestFS != null) {
459      LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString());
460      return;
461    }
462    dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
463  }
464
465  /**
466   * Sets up a new path in test filesystem to be used by tests.
467   */
468  private Path getNewDataTestDirOnTestFS() throws IOException {
469    // The file system can be either local, mini dfs, or if the configuration
470    // is supplied externally, it can be an external cluster FS. If it is a local
471    // file system, the tests should use getBaseTestDir, otherwise, we can use
472    // the working directory, and create a unique sub dir there
473    FileSystem fs = getTestFileSystem();
474    Path newDataTestDir;
475    String randomStr = getRandomUUID().toString();
476    if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
477      newDataTestDir = new Path(getDataTestDir(), randomStr);
478      File dataTestDir = new File(newDataTestDir.toString());
479      if (deleteOnExit()) dataTestDir.deleteOnExit();
480    } else {
481      Path base = getBaseTestDirOnTestFS();
482      newDataTestDir = new Path(base, randomStr);
483      if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
484    }
485    return newDataTestDir;
486  }
487
488  /**
489   * Cleans the test data directory on the test filesystem.
490   * @return True if we removed the test dirs
491   */
492  public boolean cleanupDataTestDirOnTestFS() throws IOException {
493    boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
494    if (ret) {
495      dataTestDirOnTestFS = null;
496    }
497    return ret;
498  }
499
500  /**
501   * Cleans a subdirectory under the test data directory on the test filesystem.
502   * @return True if we removed child
503   */
504  public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
505    Path cpath = getDataTestDirOnTestFS(subdirName);
506    return getTestFileSystem().delete(cpath, true);
507  }
508
509  /**
510   * Start a minidfscluster.
511   * @param servers How many DNs to start.
512   * @see #shutdownMiniDFSCluster()
513   * @return The mini dfs cluster created.
514   */
515  public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
516    return startMiniDFSCluster(servers, null);
517  }
518
519  /**
520   * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things
521   * like HDFS block location verification. If you start MiniDFSCluster without host names, all
522   * instances of the datanodes will have the same host name.
523   * @param hosts hostnames DNs to run on.
524   * @see #shutdownMiniDFSCluster()
525   * @return The mini dfs cluster created.
526   */
527  public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception {
528    if (hosts != null && hosts.length != 0) {
529      return startMiniDFSCluster(hosts.length, hosts);
530    } else {
531      return startMiniDFSCluster(1, null);
532    }
533  }
534
535  /**
536   * Start a minidfscluster. Can only create one.
537   * @param servers How many DNs to start.
538   * @param hosts   hostnames DNs to run on.
539   * @see #shutdownMiniDFSCluster()
540   * @return The mini dfs cluster created.
541   */
542  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception {
543    return startMiniDFSCluster(servers, null, hosts);
544  }
545
546  private void setFs() throws IOException {
547    if (this.dfsCluster == null) {
548      LOG.info("Skipping setting fs because dfsCluster is null");
549      return;
550    }
551    FileSystem fs = this.dfsCluster.getFileSystem();
552    CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
553
554    // re-enable this check with dfs
555    conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE);
556  }
557
558  // Workaround to avoid IllegalThreadStateException
559  // See HBASE-27148 for more details
560  private static final class FsDatasetAsyncDiskServiceFixer extends Thread {
561
562    private volatile boolean stopped = false;
563
564    private final MiniDFSCluster cluster;
565
566    FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) {
567      super("FsDatasetAsyncDiskServiceFixer");
568      setDaemon(true);
569      this.cluster = cluster;
570    }
571
572    @Override
573    public void run() {
574      while (!stopped) {
575        try {
576          Thread.sleep(30000);
577        } catch (InterruptedException e) {
578          Thread.currentThread().interrupt();
579          continue;
580        }
581        // we could add new datanodes during tests, so here we will check every 30 seconds, as the
582        // timeout of the thread pool executor is 60 seconds by default.
583        try {
584          for (DataNode dn : cluster.getDataNodes()) {
585            FsDatasetSpi<?> dataset = dn.getFSDataset();
586            Field service = dataset.getClass().getDeclaredField("asyncDiskService");
587            service.setAccessible(true);
588            Object asyncDiskService = service.get(dataset);
589            Field group = asyncDiskService.getClass().getDeclaredField("threadGroup");
590            group.setAccessible(true);
591            ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService);
592            if (threadGroup.isDaemon()) {
593              threadGroup.setDaemon(false);
594            }
595          }
596        } catch (NoSuchFieldException e) {
597          LOG.debug("NoSuchFieldException: " + e.getMessage()
598            + "; It might because your Hadoop version > 3.2.3 or 3.3.4, "
599            + "See HBASE-27595 for details.");
600        } catch (Exception e) {
601          LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e);
602        }
603      }
604    }
605
606    void shutdown() {
607      stopped = true;
608      interrupt();
609    }
610  }
611
612  public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts)
613    throws Exception {
614    createDirsAndSetProperties();
615    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
616
617    this.dfsCluster =
618      new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null);
619    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
620    this.dfsClusterFixer.start();
621    // Set this just-started cluster as our filesystem.
622    setFs();
623
624    // Wait for the cluster to be totally up
625    this.dfsCluster.waitClusterUp();
626
627    // reset the test directory for test file system
628    dataTestDirOnTestFS = null;
629    String dataTestDir = getDataTestDir().toString();
630    conf.set(HConstants.HBASE_DIR, dataTestDir);
631    LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir);
632
633    return this.dfsCluster;
634  }
635
636  public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
637    createDirsAndSetProperties();
638    dfsCluster =
639      new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
640    this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster);
641    this.dfsClusterFixer.start();
642    return dfsCluster;
643  }
644
645  /**
646   * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to
647   * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in
648   * the conf.
649   *
650   * <pre>
651   * Configuration conf = TEST_UTIL.getConfiguration();
652   * for (Iterator&lt;Map.Entry&lt;String, String&gt;&gt; i = conf.iterator(); i.hasNext();) {
653   *   Map.Entry&lt;String, String&gt; e = i.next();
654   *   assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp"));
655   * }
656   * </pre>
657   */
658  private void createDirsAndSetProperties() throws IOException {
659    setupClusterTestDir();
660    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath());
661    createDirAndSetProperty("test.cache.data");
662    createDirAndSetProperty("hadoop.tmp.dir");
663    hadoopLogDir = createDirAndSetProperty("hadoop.log.dir");
664    createDirAndSetProperty("mapreduce.cluster.local.dir");
665    createDirAndSetProperty("mapreduce.cluster.temp.dir");
666    enableShortCircuit();
667
668    Path root = getDataTestDirOnTestFS("hadoop");
669    conf.set(MapreduceTestingShim.getMROutputDirProp(),
670      new Path(root, "mapred-output-dir").toString());
671    conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
672    conf.set("mapreduce.jobtracker.staging.root.dir",
673      new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
674    conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
675    conf.set("yarn.app.mapreduce.am.staging-dir",
676      new Path(root, "mapreduce-am-staging-root-dir").toString());
677
678    // Frustrate yarn's and hdfs's attempts at writing /tmp.
679    // Below is fragile. Make it so we just interpolate any 'tmp' reference.
680    createDirAndSetProperty("yarn.node-labels.fs-store.root-dir");
681    createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir");
682    createDirAndSetProperty("yarn.nodemanager.log-dirs");
683    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
684    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir");
685    createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir");
686    createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir");
687    createDirAndSetProperty("dfs.journalnode.edits.dir");
688    createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths");
689    createDirAndSetProperty("nfs.dump.dir");
690    createDirAndSetProperty("java.io.tmpdir");
691    createDirAndSetProperty("dfs.journalnode.edits.dir");
692    createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir");
693    createDirAndSetProperty("fs.s3a.committer.staging.tmp.path");
694  }
695
696  /**
697   * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families.
698   * Default to false.
699   */
700  public boolean isNewVersionBehaviorEnabled() {
701    final String propName = "hbase.tests.new.version.behavior";
702    String v = System.getProperty(propName);
703    if (v != null) {
704      return Boolean.parseBoolean(v);
705    }
706    return false;
707  }
708
709  /**
710   * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This
711   * allows to specify this parameter on the command line. If not set, default is true.
712   */
713  public boolean isReadShortCircuitOn() {
714    final String propName = "hbase.tests.use.shortcircuit.reads";
715    String readOnProp = System.getProperty(propName);
716    if (readOnProp != null) {
717      return Boolean.parseBoolean(readOnProp);
718    } else {
719      return conf.getBoolean(propName, false);
720    }
721  }
722
723  /**
724   * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings,
725   * including skipping the hdfs checksum checks.
726   */
727  private void enableShortCircuit() {
728    if (isReadShortCircuitOn()) {
729      String curUser = System.getProperty("user.name");
730      LOG.info("read short circuit is ON for user " + curUser);
731      // read short circuit, for hdfs
732      conf.set("dfs.block.local-path-access.user", curUser);
733      // read short circuit, for hbase
734      conf.setBoolean("dfs.client.read.shortcircuit", true);
735      // Skip checking checksum, for the hdfs client and the datanode
736      conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
737    } else {
738      LOG.info("read short circuit is OFF");
739    }
740  }
741
742  private String createDirAndSetProperty(final String property) {
743    return createDirAndSetProperty(property, property);
744  }
745
746  private String createDirAndSetProperty(final String relPath, String property) {
747    String path = getDataTestDir(relPath).toString();
748    System.setProperty(property, path);
749    conf.set(property, path);
750    new File(path).mkdirs();
751    LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
752    return path;
753  }
754
755  /**
756   * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing.
757   */
758  public void shutdownMiniDFSCluster() throws IOException {
759    if (this.dfsCluster != null) {
760      // The below throws an exception per dn, AsynchronousCloseException.
761      this.dfsCluster.shutdown();
762      dfsCluster = null;
763      // It is possible that the dfs cluster is set through setDFSCluster method, where we will not
764      // have a fixer
765      if (dfsClusterFixer != null) {
766        this.dfsClusterFixer.shutdown();
767        dfsClusterFixer = null;
768      }
769      dataTestDirOnTestFS = null;
770      CommonFSUtils.setFsDefault(this.conf, new Path("file:///"));
771    }
772  }
773
774  /**
775   * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All
776   * other options will use default values, defined in {@link StartTestingClusterOption.Builder}.
777   * @param numSlaves slave node number, for both HBase region server and HDFS data node.
778   * @see #startMiniCluster(StartTestingClusterOption option)
779   * @see #shutdownMiniDFSCluster()
780   */
781  public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception {
782    StartTestingClusterOption option = StartTestingClusterOption.builder()
783      .numRegionServers(numSlaves).numDataNodes(numSlaves).build();
784    return startMiniCluster(option);
785  }
786
787  /**
788   * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default
789   * value can be found in {@link StartTestingClusterOption.Builder}.
790   * @see #startMiniCluster(StartTestingClusterOption option)
791   * @see #shutdownMiniDFSCluster()
792   */
793  public SingleProcessHBaseCluster startMiniCluster() throws Exception {
794    return startMiniCluster(StartTestingClusterOption.builder().build());
795  }
796
797  /**
798   * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies
799   * Configuration. It homes the cluster data directory under a random subdirectory in a directory
800   * under System property test.build.data, to be cleaned up on exit.
801   * @see #shutdownMiniDFSCluster()
802   */
803  public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option)
804    throws Exception {
805    LOG.info("Starting up minicluster with option: {}", option);
806
807    // If we already put up a cluster, fail.
808    if (miniClusterRunning) {
809      throw new IllegalStateException("A mini-cluster is already running");
810    }
811    miniClusterRunning = true;
812
813    setupClusterTestDir();
814
815    // Bring up mini dfs cluster. This spews a bunch of warnings about missing
816    // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
817    if (dfsCluster == null) {
818      LOG.info("STARTING DFS");
819      dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts());
820    } else {
821      LOG.info("NOT STARTING DFS");
822    }
823
824    // Start up a zk cluster.
825    if (getZkCluster() == null) {
826      startMiniZKCluster(option.getNumZkServers());
827    }
828
829    // Start the MiniHBaseCluster
830    return startMiniHBaseCluster(option);
831  }
832
833  /**
834   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
835   * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters.
836   * @return Reference to the hbase mini hbase cluster.
837   * @see #startMiniCluster(StartTestingClusterOption)
838   * @see #shutdownMiniHBaseCluster()
839   */
840  public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option)
841    throws IOException, InterruptedException {
842    // Now do the mini hbase cluster. Set the hbase.rootdir in config.
843    createRootDir(option.isCreateRootDir());
844    if (option.isCreateWALDir()) {
845      createWALRootDir();
846    }
847    // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
848    // for tests that do not read hbase-defaults.xml
849    setHBaseFsTmpDir();
850
851    // These settings will make the server waits until this exact number of
852    // regions servers are connected.
853    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
854      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers());
855    }
856    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
857      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers());
858    }
859
860    Configuration c = new Configuration(this.conf);
861    this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(),
862      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
863      option.getMasterClass(), option.getRsClass());
864    // Populate the master address configuration from mini cluster configuration.
865    conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c));
866    // Don't leave here till we've done a successful scan of the hbase:meta
867    try (Table t = getConnection().getTable(TableName.META_TABLE_NAME);
868      ResultScanner s = t.getScanner(new Scan())) {
869      for (;;) {
870        if (s.next() == null) {
871          break;
872        }
873      }
874    }
875
876    getAdmin(); // create immediately the hbaseAdmin
877    LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster());
878
879    return (SingleProcessHBaseCluster) hbaseCluster;
880  }
881
882  /**
883   * Starts up mini hbase cluster using default options. Default options can be found in
884   * {@link StartTestingClusterOption.Builder}.
885   * @see #startMiniHBaseCluster(StartTestingClusterOption)
886   * @see #shutdownMiniHBaseCluster()
887   */
888  public SingleProcessHBaseCluster startMiniHBaseCluster()
889    throws IOException, InterruptedException {
890    return startMiniHBaseCluster(StartTestingClusterOption.builder().build());
891  }
892
893  /**
894   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
895   * {@link #startMiniCluster()}. All other options will use default values, defined in
896   * {@link StartTestingClusterOption.Builder}.
897   * @param numMasters       Master node number.
898   * @param numRegionServers Number of region servers.
899   * @return The mini HBase cluster created.
900   * @see #shutdownMiniHBaseCluster()
901   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
902   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
903   * @see #startMiniHBaseCluster(StartTestingClusterOption)
904   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
905   */
906  @Deprecated
907  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers)
908    throws IOException, InterruptedException {
909    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
910      .numRegionServers(numRegionServers).build();
911    return startMiniHBaseCluster(option);
912  }
913
914  /**
915   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
916   * {@link #startMiniCluster()}. All other options will use default values, defined in
917   * {@link StartTestingClusterOption.Builder}.
918   * @param numMasters       Master node number.
919   * @param numRegionServers Number of region servers.
920   * @param rsPorts          Ports that RegionServer should use.
921   * @return The mini HBase cluster created.
922   * @see #shutdownMiniHBaseCluster()
923   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
924   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
925   * @see #startMiniHBaseCluster(StartTestingClusterOption)
926   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
927   */
928  @Deprecated
929  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
930    List<Integer> rsPorts) throws IOException, InterruptedException {
931    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
932      .numRegionServers(numRegionServers).rsPorts(rsPorts).build();
933    return startMiniHBaseCluster(option);
934  }
935
936  /**
937   * Starts up mini hbase cluster. Usually you won't want this. You'll usually want
938   * {@link #startMiniCluster()}. All other options will use default values, defined in
939   * {@link StartTestingClusterOption.Builder}.
940   * @param numMasters       Master node number.
941   * @param numRegionServers Number of region servers.
942   * @param rsPorts          Ports that RegionServer should use.
943   * @param masterClass      The class to use as HMaster, or null for default.
944   * @param rsClass          The class to use as HRegionServer, or null for default.
945   * @param createRootDir    Whether to create a new root or data directory path.
946   * @param createWALDir     Whether to create a new WAL directory.
947   * @return The mini HBase cluster created.
948   * @see #shutdownMiniHBaseCluster()
949   * @deprecated since 2.2.0 and will be removed in 4.0.0. Use
950   *             {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead.
951   * @see #startMiniHBaseCluster(StartTestingClusterOption)
952   * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a>
953   */
954  @Deprecated
955  public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers,
956    List<Integer> rsPorts, Class<? extends HMaster> masterClass,
957    Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass,
958    boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException {
959    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters)
960      .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts)
961      .createRootDir(createRootDir).createWALDir(createWALDir).build();
962    return startMiniHBaseCluster(option);
963  }
964
965  /**
966   * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you
967   * want to keep dfs/zk up and just stop/start hbase.
968   * @param servers number of region servers
969   */
970  public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
971    this.restartHBaseCluster(servers, null);
972  }
973
974  public void restartHBaseCluster(int servers, List<Integer> ports)
975    throws IOException, InterruptedException {
976    StartTestingClusterOption option =
977      StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build();
978    restartHBaseCluster(option);
979    invalidateConnection();
980  }
981
982  public void restartHBaseCluster(StartTestingClusterOption option)
983    throws IOException, InterruptedException {
984    closeConnection();
985    this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(),
986      option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(),
987      option.getMasterClass(), option.getRsClass());
988    // Don't leave here till we've done a successful scan of the hbase:meta
989    Connection conn = ConnectionFactory.createConnection(this.conf);
990    Table t = conn.getTable(TableName.META_TABLE_NAME);
991    ResultScanner s = t.getScanner(new Scan());
992    while (s.next() != null) {
993      // do nothing
994    }
995    LOG.info("HBase has been restarted");
996    s.close();
997    t.close();
998    conn.close();
999  }
1000
1001  /**
1002   * Returns current mini hbase cluster. Only has something in it after a call to
1003   * {@link #startMiniCluster()}.
1004   * @see #startMiniCluster()
1005   */
1006  public SingleProcessHBaseCluster getMiniHBaseCluster() {
1007    if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) {
1008      return (SingleProcessHBaseCluster) this.hbaseCluster;
1009    }
1010    throw new RuntimeException(
1011      hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName());
1012  }
1013
1014  /**
1015   * Stops mini hbase, zk, and hdfs clusters.
1016   * @see #startMiniCluster(int)
1017   */
1018  public void shutdownMiniCluster() throws IOException {
1019    LOG.info("Shutting down minicluster");
1020    shutdownMiniHBaseCluster();
1021    shutdownMiniDFSCluster();
1022    shutdownMiniZKCluster();
1023
1024    cleanupTestDir();
1025    miniClusterRunning = false;
1026    LOG.info("Minicluster is down");
1027  }
1028
1029  /**
1030   * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running.
1031   * @throws java.io.IOException in case command is unsuccessful
1032   */
1033  public void shutdownMiniHBaseCluster() throws IOException {
1034    cleanup();
1035    if (this.hbaseCluster != null) {
1036      this.hbaseCluster.shutdown();
1037      // Wait till hbase is down before going on to shutdown zk.
1038      this.hbaseCluster.waitUntilShutDown();
1039      this.hbaseCluster = null;
1040    }
1041    if (zooKeeperWatcher != null) {
1042      zooKeeperWatcher.close();
1043      zooKeeperWatcher = null;
1044    }
1045  }
1046
1047  /**
1048   * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running.
1049   * @throws java.io.IOException throws in case command is unsuccessful
1050   */
1051  public void killMiniHBaseCluster() throws IOException {
1052    cleanup();
1053    if (this.hbaseCluster != null) {
1054      getMiniHBaseCluster().killAll();
1055      this.hbaseCluster = null;
1056    }
1057    if (zooKeeperWatcher != null) {
1058      zooKeeperWatcher.close();
1059      zooKeeperWatcher = null;
1060    }
1061  }
1062
1063  // close hbase admin, close current connection and reset MIN MAX configs for RS.
1064  private void cleanup() throws IOException {
1065    closeConnection();
1066    // unset the configuration for MIN and MAX RS to start
1067    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1068    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1069  }
1070
1071  /**
1072   * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true,
1073   * a new root directory path is fetched irrespective of whether it has been fetched before or not.
1074   * If false, previous path is used. Note: this does not cause the root dir to be created.
1075   * @return Fully qualified path for the default hbase root dir
1076   */
1077  public Path getDefaultRootDirPath(boolean create) throws IOException {
1078    if (!create) {
1079      return getDataTestDirOnTestFS();
1080    } else {
1081      return getNewDataTestDirOnTestFS();
1082    }
1083  }
1084
1085  /**
1086   * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that
1087   * <code>create</code> flag is false. Note: this does not cause the root dir to be created.
1088   * @return Fully qualified path for the default hbase root dir
1089   */
1090  public Path getDefaultRootDirPath() throws IOException {
1091    return getDefaultRootDirPath(false);
1092  }
1093
1094  /**
1095   * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you
1096   * won't make use of this method. Root hbasedir is created for you as part of mini cluster
1097   * startup. You'd only use this method if you were doing manual operation.
1098   * @param create This flag decides whether to get a new root or data directory path or not, if it
1099   *               has been fetched already. Note : Directory will be made irrespective of whether
1100   *               path has been fetched or not. If directory already exists, it will be overwritten
1101   * @return Fully qualified path to hbase root dir
1102   */
1103  public Path createRootDir(boolean create) throws IOException {
1104    FileSystem fs = FileSystem.get(this.conf);
1105    Path hbaseRootdir = getDefaultRootDirPath(create);
1106    CommonFSUtils.setRootDir(this.conf, hbaseRootdir);
1107    fs.mkdirs(hbaseRootdir);
1108    FSUtils.setVersion(fs, hbaseRootdir);
1109    return hbaseRootdir;
1110  }
1111
1112  /**
1113   * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code>
1114   * flag is false.
1115   * @return Fully qualified path to hbase root dir
1116   */
1117  public Path createRootDir() throws IOException {
1118    return createRootDir(false);
1119  }
1120
1121  /**
1122   * Creates a hbase walDir in the user's home directory. Normally you won't make use of this
1123   * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use
1124   * this method if you were doing manual operation.
1125   * @return Fully qualified path to hbase root dir
1126   */
1127  public Path createWALRootDir() throws IOException {
1128    FileSystem fs = FileSystem.get(this.conf);
1129    Path walDir = getNewDataTestDirOnTestFS();
1130    CommonFSUtils.setWALRootDir(this.conf, walDir);
1131    fs.mkdirs(walDir);
1132    return walDir;
1133  }
1134
1135  private void setHBaseFsTmpDir() throws IOException {
1136    String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1137    if (hbaseFsTmpDirInString == null) {
1138      this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString());
1139      LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1140    } else {
1141      LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1142    }
1143  }
1144
1145  /**
1146   * Flushes all caches in the mini hbase cluster
1147   */
1148  public void flush() throws IOException {
1149    getMiniHBaseCluster().flushcache();
1150  }
1151
1152  /**
1153   * Flushes all caches in the mini hbase cluster
1154   */
1155  public void flush(TableName tableName) throws IOException {
1156    getMiniHBaseCluster().flushcache(tableName);
1157  }
1158
1159  /**
1160   * Compact all regions in the mini hbase cluster
1161   */
1162  public void compact(boolean major) throws IOException {
1163    getMiniHBaseCluster().compact(major);
1164  }
1165
1166  /**
1167   * Compact all of a table's reagion in the mini hbase cluster
1168   */
1169  public void compact(TableName tableName, boolean major) throws IOException {
1170    getMiniHBaseCluster().compact(tableName, major);
1171  }
1172
1173  /**
1174   * Create a table.
1175   * @return A Table instance for the created table.
1176   */
1177  public Table createTable(TableName tableName, String family) throws IOException {
1178    return createTable(tableName, new String[] { family });
1179  }
1180
1181  /**
1182   * Create a table.
1183   * @return A Table instance for the created table.
1184   */
1185  public Table createTable(TableName tableName, String[] families) throws IOException {
1186    List<byte[]> fams = new ArrayList<>(families.length);
1187    for (String family : families) {
1188      fams.add(Bytes.toBytes(family));
1189    }
1190    return createTable(tableName, fams.toArray(new byte[0][]));
1191  }
1192
1193  /**
1194   * Create a table.
1195   * @return A Table instance for the created table.
1196   */
1197  public Table createTable(TableName tableName, byte[] family) throws IOException {
1198    return createTable(tableName, new byte[][] { family });
1199  }
1200
1201  /**
1202   * Create a table with multiple regions.
1203   * @return A Table instance for the created table.
1204   */
1205  public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1206    throws IOException {
1207    if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1208    byte[] startKey = Bytes.toBytes("aaaaa");
1209    byte[] endKey = Bytes.toBytes("zzzzz");
1210    byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1211
1212    return createTable(tableName, new byte[][] { family }, splitKeys);
1213  }
1214
1215  /**
1216   * Create a table.
1217   * @return A Table instance for the created table.
1218   */
1219  public Table createTable(TableName tableName, byte[][] families) throws IOException {
1220    return createTable(tableName, families, (byte[][]) null);
1221  }
1222
1223  /**
1224   * Create a table with multiple regions.
1225   * @return A Table instance for the created table.
1226   */
1227  public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1228    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1229  }
1230
1231  /**
1232   * Create a table with multiple regions.
1233   * @param replicaCount replica count.
1234   * @return A Table instance for the created table.
1235   */
1236  public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families)
1237    throws IOException {
1238    return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount);
1239  }
1240
1241  /**
1242   * Create a table.
1243   * @return A Table instance for the created table.
1244   */
1245  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1246    throws IOException {
1247    return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration()));
1248  }
1249
1250  /**
1251   * Create a table.
1252   * @param tableName    the table name
1253   * @param families     the families
1254   * @param splitKeys    the splitkeys
1255   * @param replicaCount the region replica count
1256   * @return A Table instance for the created table.
1257   * @throws IOException throws IOException
1258   */
1259  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1260    int replicaCount) throws IOException {
1261    return createTable(tableName, families, splitKeys, replicaCount,
1262      new Configuration(getConfiguration()));
1263  }
1264
1265  public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey,
1266    byte[] endKey, int numRegions) throws IOException {
1267    TableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1268
1269    getAdmin().createTable(desc, startKey, endKey, numRegions);
1270    // HBaseAdmin only waits for regions to appear in hbase:meta we
1271    // should wait until they are assigned
1272    waitUntilAllRegionsAssigned(tableName);
1273    return getConnection().getTable(tableName);
1274  }
1275
1276  /**
1277   * Create a table.
1278   * @param c Configuration to use
1279   * @return A Table instance for the created table.
1280   */
1281  public Table createTable(TableDescriptor htd, byte[][] families, Configuration c)
1282    throws IOException {
1283    return createTable(htd, families, null, c);
1284  }
1285
1286  /**
1287   * Create a table.
1288   * @param htd       table descriptor
1289   * @param families  array of column families
1290   * @param splitKeys array of split keys
1291   * @param c         Configuration to use
1292   * @return A Table instance for the created table.
1293   * @throws IOException if getAdmin or createTable fails
1294   */
1295  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1296    Configuration c) throws IOException {
1297    // Disable blooms (they are on by default as of 0.95) but we disable them here because
1298    // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1299    // on is interfering.
1300    return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c);
1301  }
1302
1303  /**
1304   * Create a table.
1305   * @param htd       table descriptor
1306   * @param families  array of column families
1307   * @param splitKeys array of split keys
1308   * @param type      Bloom type
1309   * @param blockSize block size
1310   * @param c         Configuration to use
1311   * @return A Table instance for the created table.
1312   * @throws IOException if getAdmin or createTable fails
1313   */
1314
1315  public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys,
1316    BloomType type, int blockSize, Configuration c) throws IOException {
1317    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1318    for (byte[] family : families) {
1319      ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family)
1320        .setBloomFilterType(type).setBlocksize(blockSize);
1321      if (isNewVersionBehaviorEnabled()) {
1322        cfdb.setNewVersionBehavior(true);
1323      }
1324      builder.setColumnFamily(cfdb.build());
1325    }
1326    TableDescriptor td = builder.build();
1327    if (splitKeys != null) {
1328      getAdmin().createTable(td, splitKeys);
1329    } else {
1330      getAdmin().createTable(td);
1331    }
1332    // HBaseAdmin only waits for regions to appear in hbase:meta
1333    // we should wait until they are assigned
1334    waitUntilAllRegionsAssigned(td.getTableName());
1335    return getConnection().getTable(td.getTableName());
1336  }
1337
1338  /**
1339   * Create a table.
1340   * @param htd       table descriptor
1341   * @param splitRows array of split keys
1342   * @return A Table instance for the created table.
1343   */
1344  public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException {
1345    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
1346    if (isNewVersionBehaviorEnabled()) {
1347      for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) {
1348        builder.setColumnFamily(
1349          ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build());
1350      }
1351    }
1352    if (splitRows != null) {
1353      getAdmin().createTable(builder.build(), splitRows);
1354    } else {
1355      getAdmin().createTable(builder.build());
1356    }
1357    // HBaseAdmin only waits for regions to appear in hbase:meta
1358    // we should wait until they are assigned
1359    waitUntilAllRegionsAssigned(htd.getTableName());
1360    return getConnection().getTable(htd.getTableName());
1361  }
1362
1363  /**
1364   * Create a table.
1365   * @param tableName    the table name
1366   * @param families     the families
1367   * @param splitKeys    the split keys
1368   * @param replicaCount the replica count
1369   * @param c            Configuration to use
1370   * @return A Table instance for the created table.
1371   */
1372  public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1373    int replicaCount, final Configuration c) throws IOException {
1374    TableDescriptor htd =
1375      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build();
1376    return createTable(htd, families, splitKeys, c);
1377  }
1378
1379  /**
1380   * Create a table.
1381   * @return A Table instance for the created table.
1382   */
1383  public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException {
1384    return createTable(tableName, new byte[][] { family }, numVersions);
1385  }
1386
1387  /**
1388   * Create a table.
1389   * @return A Table instance for the created table.
1390   */
1391  public Table createTable(TableName tableName, byte[][] families, int numVersions)
1392    throws IOException {
1393    return createTable(tableName, families, numVersions, (byte[][]) null);
1394  }
1395
1396  /**
1397   * Create a table.
1398   * @return A Table instance for the created table.
1399   */
1400  public Table createTable(TableName tableName, byte[][] families, int numVersions,
1401    byte[][] splitKeys) throws IOException {
1402    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1403    for (byte[] family : families) {
1404      ColumnFamilyDescriptorBuilder cfBuilder =
1405        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions);
1406      if (isNewVersionBehaviorEnabled()) {
1407        cfBuilder.setNewVersionBehavior(true);
1408      }
1409      builder.setColumnFamily(cfBuilder.build());
1410    }
1411    if (splitKeys != null) {
1412      getAdmin().createTable(builder.build(), splitKeys);
1413    } else {
1414      getAdmin().createTable(builder.build());
1415    }
1416    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1417    // assigned
1418    waitUntilAllRegionsAssigned(tableName);
1419    return getConnection().getTable(tableName);
1420  }
1421
1422  /**
1423   * Create a table with multiple regions.
1424   * @return A Table instance for the created table.
1425   */
1426  public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1427    throws IOException {
1428    return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1429  }
1430
1431  /**
1432   * Create a table.
1433   * @return A Table instance for the created table.
1434   */
1435  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize)
1436    throws IOException {
1437    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1438    for (byte[] family : families) {
1439      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1440        .setMaxVersions(numVersions).setBlocksize(blockSize);
1441      if (isNewVersionBehaviorEnabled()) {
1442        cfBuilder.setNewVersionBehavior(true);
1443      }
1444      builder.setColumnFamily(cfBuilder.build());
1445    }
1446    getAdmin().createTable(builder.build());
1447    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1448    // assigned
1449    waitUntilAllRegionsAssigned(tableName);
1450    return getConnection().getTable(tableName);
1451  }
1452
1453  public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize,
1454    String cpName) throws IOException {
1455    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1456    for (byte[] family : families) {
1457      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family)
1458        .setMaxVersions(numVersions).setBlocksize(blockSize);
1459      if (isNewVersionBehaviorEnabled()) {
1460        cfBuilder.setNewVersionBehavior(true);
1461      }
1462      builder.setColumnFamily(cfBuilder.build());
1463    }
1464    if (cpName != null) {
1465      builder.setCoprocessor(cpName);
1466    }
1467    getAdmin().createTable(builder.build());
1468    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1469    // assigned
1470    waitUntilAllRegionsAssigned(tableName);
1471    return getConnection().getTable(tableName);
1472  }
1473
1474  /**
1475   * Create a table.
1476   * @return A Table instance for the created table.
1477   */
1478  public Table createTable(TableName tableName, byte[][] families, int[] numVersions)
1479    throws IOException {
1480    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1481    int i = 0;
1482    for (byte[] family : families) {
1483      ColumnFamilyDescriptorBuilder cfBuilder =
1484        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]);
1485      if (isNewVersionBehaviorEnabled()) {
1486        cfBuilder.setNewVersionBehavior(true);
1487      }
1488      builder.setColumnFamily(cfBuilder.build());
1489      i++;
1490    }
1491    getAdmin().createTable(builder.build());
1492    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1493    // assigned
1494    waitUntilAllRegionsAssigned(tableName);
1495    return getConnection().getTable(tableName);
1496  }
1497
1498  /**
1499   * Create a table.
1500   * @return A Table instance for the created table.
1501   */
1502  public Table createTable(TableName tableName, byte[] family, byte[][] splitRows)
1503    throws IOException {
1504    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1505    ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1506    if (isNewVersionBehaviorEnabled()) {
1507      cfBuilder.setNewVersionBehavior(true);
1508    }
1509    builder.setColumnFamily(cfBuilder.build());
1510    getAdmin().createTable(builder.build(), splitRows);
1511    // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1512    // assigned
1513    waitUntilAllRegionsAssigned(tableName);
1514    return getConnection().getTable(tableName);
1515  }
1516
1517  /**
1518   * Create a table with multiple regions.
1519   * @return A Table instance for the created table.
1520   */
1521  public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1522    return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1523  }
1524
1525  /**
1526   * Set the number of Region replicas.
1527   */
1528  public static void setReplicas(Admin admin, TableName table, int replicaCount)
1529    throws IOException, InterruptedException {
1530    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table))
1531      .setRegionReplication(replicaCount).build();
1532    admin.modifyTable(desc);
1533  }
1534
1535  /**
1536   * Set the number of Region replicas.
1537   */
1538  public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount)
1539    throws ExecutionException, IOException, InterruptedException {
1540    TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get())
1541      .setRegionReplication(replicaCount).build();
1542    admin.modifyTable(desc).get();
1543  }
1544
1545  /**
1546   * Drop an existing table
1547   * @param tableName existing table
1548   */
1549  public void deleteTable(TableName tableName) throws IOException {
1550    try {
1551      getAdmin().disableTable(tableName);
1552    } catch (TableNotEnabledException e) {
1553      LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1554    }
1555    getAdmin().deleteTable(tableName);
1556  }
1557
1558  /**
1559   * Drop an existing table
1560   * @param tableName existing table
1561   */
1562  public void deleteTableIfAny(TableName tableName) throws IOException {
1563    try {
1564      deleteTable(tableName);
1565    } catch (TableNotFoundException e) {
1566      // ignore
1567    }
1568  }
1569
1570  // ==========================================================================
1571  // Canned table and table descriptor creation
1572
1573  public final static byte[] fam1 = Bytes.toBytes("colfamily11");
1574  public final static byte[] fam2 = Bytes.toBytes("colfamily21");
1575  public final static byte[] fam3 = Bytes.toBytes("colfamily31");
1576  public static final byte[][] COLUMNS = { fam1, fam2, fam3 };
1577  private static final int MAXVERSIONS = 3;
1578
1579  public static final char FIRST_CHAR = 'a';
1580  public static final char LAST_CHAR = 'z';
1581  public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
1582  public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1583
1584  public TableDescriptorBuilder createModifyableTableDescriptor(final String name) {
1585    return createModifyableTableDescriptor(TableName.valueOf(name),
1586      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
1587      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1588  }
1589
1590  public TableDescriptor createTableDescriptor(final TableName name, final int minVersions,
1591    final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1592    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1593    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1594      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1595        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1596        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1597      if (isNewVersionBehaviorEnabled()) {
1598        cfBuilder.setNewVersionBehavior(true);
1599      }
1600      builder.setColumnFamily(cfBuilder.build());
1601    }
1602    return builder.build();
1603  }
1604
1605  public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name,
1606    final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1607    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
1608    for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
1609      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
1610        .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
1611        .setBlockCacheEnabled(false).setTimeToLive(ttl);
1612      if (isNewVersionBehaviorEnabled()) {
1613        cfBuilder.setNewVersionBehavior(true);
1614      }
1615      builder.setColumnFamily(cfBuilder.build());
1616    }
1617    return builder;
1618  }
1619
1620  /**
1621   * Create a table of name <code>name</code>.
1622   * @param name Name to give table.
1623   * @return Column descriptor.
1624   */
1625  public TableDescriptor createTableDescriptor(final TableName name) {
1626    return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS,
1627      MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
1628  }
1629
1630  public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) {
1631    return createTableDescriptor(tableName, new byte[][] { family }, 1);
1632  }
1633
1634  public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families,
1635    int maxVersions) {
1636    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1637    for (byte[] family : families) {
1638      ColumnFamilyDescriptorBuilder cfBuilder =
1639        ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions);
1640      if (isNewVersionBehaviorEnabled()) {
1641        cfBuilder.setNewVersionBehavior(true);
1642      }
1643      builder.setColumnFamily(cfBuilder.build());
1644    }
1645    return builder.build();
1646  }
1647
1648  /**
1649   * Create an HRegion that writes to the local tmp dirs
1650   * @param desc     a table descriptor indicating which table the region belongs to
1651   * @param startKey the start boundary of the region
1652   * @param endKey   the end boundary of the region
1653   * @return a region that writes to local dir for testing
1654   */
1655  public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey)
1656    throws IOException {
1657    RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey)
1658      .setEndKey(endKey).build();
1659    return createLocalHRegion(hri, desc);
1660  }
1661
1662  /**
1663   * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call
1664   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it.
1665   */
1666  public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException {
1667    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc);
1668  }
1669
1670  /**
1671   * Create an HRegion that writes to the local tmp dirs with specified wal
1672   * @param info regioninfo
1673   * @param conf configuration
1674   * @param desc table descriptor
1675   * @param wal  wal for this region.
1676   * @return created hregion
1677   */
1678  public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
1679    WAL wal) throws IOException {
1680    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
1681      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
1682    return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
1683  }
1684
1685  /**
1686   * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
1687   *         when done.
1688   */
1689  public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
1690    Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
1691    throws IOException {
1692    return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly,
1693      durability, wal, null, families);
1694  }
1695
1696  public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey,
1697    byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal,
1698    boolean[] compactedMemStore, byte[]... families) throws IOException {
1699    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
1700    builder.setReadOnly(isReadOnly);
1701    int i = 0;
1702    for (byte[] family : families) {
1703      ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
1704      if (compactedMemStore != null && i < compactedMemStore.length) {
1705        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
1706      } else {
1707        cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE);
1708
1709      }
1710      i++;
1711      // Set default to be three versions.
1712      cfBuilder.setMaxVersions(Integer.MAX_VALUE);
1713      builder.setColumnFamily(cfBuilder.build());
1714    }
1715    builder.setDurability(durability);
1716    RegionInfo info =
1717      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
1718    return createLocalHRegion(info, conf, builder.build(), wal);
1719  }
1720
1721  //
1722  // ==========================================================================
1723
1724  /**
1725   * Provide an existing table name to truncate. Scans the table and issues a delete for each row
1726   * read.
1727   * @param tableName existing table
1728   * @return HTable to that new table
1729   */
1730  public Table deleteTableData(TableName tableName) throws IOException {
1731    Table table = getConnection().getTable(tableName);
1732    Scan scan = new Scan();
1733    ResultScanner resScan = table.getScanner(scan);
1734    for (Result res : resScan) {
1735      Delete del = new Delete(res.getRow());
1736      table.delete(del);
1737    }
1738    resScan = table.getScanner(scan);
1739    resScan.close();
1740    return table;
1741  }
1742
1743  /**
1744   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1745   * table.
1746   * @param tableName       table which must exist.
1747   * @param preserveRegions keep the existing split points
1748   * @return HTable for the new table
1749   */
1750  public Table truncateTable(final TableName tableName, final boolean preserveRegions)
1751    throws IOException {
1752    Admin admin = getAdmin();
1753    if (!admin.isTableDisabled(tableName)) {
1754      admin.disableTable(tableName);
1755    }
1756    admin.truncateTable(tableName, preserveRegions);
1757    return getConnection().getTable(tableName);
1758  }
1759
1760  /**
1761   * Truncate a table using the admin command. Effectively disables, deletes, and recreates the
1762   * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not
1763   * preserve regions of existing table.
1764   * @param tableName table which must exist.
1765   * @return HTable for the new table
1766   */
1767  public Table truncateTable(final TableName tableName) throws IOException {
1768    return truncateTable(tableName, false);
1769  }
1770
1771  /**
1772   * Load table with rows from 'aaa' to 'zzz'.
1773   * @param t Table
1774   * @param f Family
1775   * @return Count of rows loaded.
1776   */
1777  public int loadTable(final Table t, final byte[] f) throws IOException {
1778    return loadTable(t, new byte[][] { f });
1779  }
1780
1781  /**
1782   * Load table with rows from 'aaa' to 'zzz'.
1783   * @param t Table
1784   * @param f Family
1785   * @return Count of rows loaded.
1786   */
1787  public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
1788    return loadTable(t, new byte[][] { f }, null, writeToWAL);
1789  }
1790
1791  /**
1792   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1793   * @param t Table
1794   * @param f Array of Families to load
1795   * @return Count of rows loaded.
1796   */
1797  public int loadTable(final Table t, final byte[][] f) throws IOException {
1798    return loadTable(t, f, null);
1799  }
1800
1801  /**
1802   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1803   * @param t     Table
1804   * @param f     Array of Families to load
1805   * @param value the values of the cells. If null is passed, the row key is used as value
1806   * @return Count of rows loaded.
1807   */
1808  public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
1809    return loadTable(t, f, value, true);
1810  }
1811
1812  /**
1813   * Load table of multiple column families with rows from 'aaa' to 'zzz'.
1814   * @param t     Table
1815   * @param f     Array of Families to load
1816   * @param value the values of the cells. If null is passed, the row key is used as value
1817   * @return Count of rows loaded.
1818   */
1819  public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL)
1820    throws IOException {
1821    List<Put> puts = new ArrayList<>();
1822    for (byte[] row : HBaseTestingUtil.ROWS) {
1823      Put put = new Put(row);
1824      put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
1825      for (int i = 0; i < f.length; i++) {
1826        byte[] value1 = value != null ? value : row;
1827        put.addColumn(f[i], f[i], value1);
1828      }
1829      puts.add(put);
1830    }
1831    t.put(puts);
1832    return puts.size();
1833  }
1834
1835  /**
1836   * A tracker for tracking and validating table rows generated with
1837   * {@link HBaseTestingUtil#loadTable(Table, byte[])}
1838   */
1839  public static class SeenRowTracker {
1840    int dim = 'z' - 'a' + 1;
1841    int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen
1842    byte[] startRow;
1843    byte[] stopRow;
1844
1845    public SeenRowTracker(byte[] startRow, byte[] stopRow) {
1846      this.startRow = startRow;
1847      this.stopRow = stopRow;
1848    }
1849
1850    void reset() {
1851      for (byte[] row : ROWS) {
1852        seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
1853      }
1854    }
1855
1856    int i(byte b) {
1857      return b - 'a';
1858    }
1859
1860    public void addRow(byte[] row) {
1861      seenRows[i(row[0])][i(row[1])][i(row[2])]++;
1862    }
1863
1864    /**
1865     * Validate that all the rows between startRow and stopRow are seen exactly once, and all other
1866     * rows none
1867     */
1868    public void validate() {
1869      for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1870        for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1871          for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1872            int count = seenRows[i(b1)][i(b2)][i(b3)];
1873            int expectedCount = 0;
1874            if (
1875              Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0
1876                && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0
1877            ) {
1878              expectedCount = 1;
1879            }
1880            if (count != expectedCount) {
1881              String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8);
1882              throw new RuntimeException("Row:" + row + " has a seen count of " + count + " "
1883                + "instead of " + expectedCount);
1884            }
1885          }
1886        }
1887      }
1888    }
1889  }
1890
1891  public int loadRegion(final HRegion r, final byte[] f) throws IOException {
1892    return loadRegion(r, f, false);
1893  }
1894
1895  public int loadRegion(final Region r, final byte[] f) throws IOException {
1896    return loadRegion((HRegion) r, f);
1897  }
1898
1899  /**
1900   * Load region with rows from 'aaa' to 'zzz'.
1901   * @param r     Region
1902   * @param f     Family
1903   * @param flush flush the cache if true
1904   * @return Count of rows loaded.
1905   */
1906  public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException {
1907    byte[] k = new byte[3];
1908    int rowCount = 0;
1909    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1910      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1911        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1912          k[0] = b1;
1913          k[1] = b2;
1914          k[2] = b3;
1915          Put put = new Put(k);
1916          put.setDurability(Durability.SKIP_WAL);
1917          put.addColumn(f, null, k);
1918          if (r.getWAL() == null) {
1919            put.setDurability(Durability.SKIP_WAL);
1920          }
1921          int preRowCount = rowCount;
1922          int pause = 10;
1923          int maxPause = 1000;
1924          while (rowCount == preRowCount) {
1925            try {
1926              r.put(put);
1927              rowCount++;
1928            } catch (RegionTooBusyException e) {
1929              pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
1930              Threads.sleep(pause);
1931            }
1932          }
1933        }
1934      }
1935      if (flush) {
1936        r.flush(true);
1937      }
1938    }
1939    return rowCount;
1940  }
1941
1942  public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
1943    throws IOException {
1944    for (int i = startRow; i < endRow; i++) {
1945      byte[] data = Bytes.toBytes(String.valueOf(i));
1946      Put put = new Put(data);
1947      put.addColumn(f, null, data);
1948      t.put(put);
1949    }
1950  }
1951
1952  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
1953    throws IOException {
1954    for (int i = 0; i < totalRows; i++) {
1955      byte[] row = new byte[rowSize];
1956      Bytes.random(row);
1957      Put put = new Put(row);
1958      put.addColumn(f, new byte[] { 0 }, new byte[] { 0 });
1959      t.put(put);
1960    }
1961  }
1962
1963  public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
1964    int replicaId) throws IOException {
1965    for (int i = startRow; i < endRow; i++) {
1966      String failMsg = "Failed verification of row :" + i;
1967      byte[] data = Bytes.toBytes(String.valueOf(i));
1968      Get get = new Get(data);
1969      get.setReplicaId(replicaId);
1970      get.setConsistency(Consistency.TIMELINE);
1971      Result result = table.get(get);
1972      assertTrue(failMsg, result.containsColumn(f, null));
1973      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
1974      Cell cell = result.getColumnLatestCell(f, null);
1975      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
1976        cell.getValueOffset(), cell.getValueLength()));
1977    }
1978  }
1979
1980  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
1981    throws IOException {
1982    verifyNumericRows((HRegion) region, f, startRow, endRow);
1983  }
1984
1985  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
1986    throws IOException {
1987    verifyNumericRows(region, f, startRow, endRow, true);
1988  }
1989
1990  public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
1991    final boolean present) throws IOException {
1992    verifyNumericRows((HRegion) region, f, startRow, endRow, present);
1993  }
1994
1995  public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
1996    final boolean present) throws IOException {
1997    for (int i = startRow; i < endRow; i++) {
1998      String failMsg = "Failed verification of row :" + i;
1999      byte[] data = Bytes.toBytes(String.valueOf(i));
2000      Result result = region.get(new Get(data));
2001
2002      boolean hasResult = result != null && !result.isEmpty();
2003      assertEquals(failMsg + result, present, hasResult);
2004      if (!present) continue;
2005
2006      assertTrue(failMsg, result.containsColumn(f, null));
2007      assertEquals(failMsg, 1, result.getColumnCells(f, null).size());
2008      Cell cell = result.getColumnLatestCell(f, null);
2009      assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(),
2010        cell.getValueOffset(), cell.getValueLength()));
2011    }
2012  }
2013
2014  public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2015    throws IOException {
2016    for (int i = startRow; i < endRow; i++) {
2017      byte[] data = Bytes.toBytes(String.valueOf(i));
2018      Delete delete = new Delete(data);
2019      delete.addFamily(f);
2020      t.delete(delete);
2021    }
2022  }
2023
2024  /**
2025   * Return the number of rows in the given table.
2026   * @param table to count rows
2027   * @return count of rows
2028   */
2029  public static int countRows(final Table table) throws IOException {
2030    return countRows(table, new Scan());
2031  }
2032
2033  public static int countRows(final Table table, final Scan scan) throws IOException {
2034    try (ResultScanner results = table.getScanner(scan)) {
2035      int count = 0;
2036      while (results.next() != null) {
2037        count++;
2038      }
2039      return count;
2040    }
2041  }
2042
2043  public static int countRows(final Table table, final byte[]... families) throws IOException {
2044    Scan scan = new Scan();
2045    for (byte[] family : families) {
2046      scan.addFamily(family);
2047    }
2048    return countRows(table, scan);
2049  }
2050
2051  /**
2052   * Return the number of rows in the given table.
2053   */
2054  public int countRows(final TableName tableName) throws IOException {
2055    try (Table table = getConnection().getTable(tableName)) {
2056      return countRows(table);
2057    }
2058  }
2059
2060  public static int countRows(final Region region) throws IOException {
2061    return countRows(region, new Scan());
2062  }
2063
2064  public static int countRows(final Region region, final Scan scan) throws IOException {
2065    try (InternalScanner scanner = region.getScanner(scan)) {
2066      return countRows(scanner);
2067    }
2068  }
2069
2070  public static int countRows(final InternalScanner scanner) throws IOException {
2071    int scannedCount = 0;
2072    List<Cell> results = new ArrayList<>();
2073    boolean hasMore = true;
2074    while (hasMore) {
2075      hasMore = scanner.next(results);
2076      scannedCount += results.size();
2077      results.clear();
2078    }
2079    return scannedCount;
2080  }
2081
2082  /**
2083   * Return an md5 digest of the entire contents of a table.
2084   */
2085  public String checksumRows(final Table table) throws Exception {
2086    MessageDigest digest = MessageDigest.getInstance("MD5");
2087    try (ResultScanner results = table.getScanner(new Scan())) {
2088      for (Result res : results) {
2089        digest.update(res.getRow());
2090      }
2091    }
2092    return digest.toString();
2093  }
2094
2095  /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */
2096  public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2097  static {
2098    int i = 0;
2099    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2100      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2101        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2102          ROWS[i][0] = b1;
2103          ROWS[i][1] = b2;
2104          ROWS[i][2] = b3;
2105          i++;
2106        }
2107      }
2108    }
2109  }
2110
2111  public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2112    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2113    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2114    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2115    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2116    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2117    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") };
2118
2119  public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"),
2120    Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"),
2121    Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"),
2122    Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2123    Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
2124    Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
2125    Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") };
2126
2127  /**
2128   * Create rows in hbase:meta for regions of the specified table with the specified start keys. The
2129   * first startKey should be a 0 length byte array if you want to form a proper range of regions.
2130   * @return list of region info for regions added to meta
2131   */
2132  public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf,
2133    final TableDescriptor htd, byte[][] startKeys) throws IOException {
2134    try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
2135      Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2136      List<RegionInfo> newRegions = new ArrayList<>(startKeys.length);
2137      MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(),
2138        TableState.State.ENABLED);
2139      // add custom ones
2140      for (int i = 0; i < startKeys.length; i++) {
2141        int j = (i + 1) % startKeys.length;
2142        RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i])
2143          .setEndKey(startKeys[j]).build();
2144        MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1);
2145        newRegions.add(hri);
2146      }
2147      return newRegions;
2148    }
2149  }
2150
2151  /**
2152   * Create an unmanaged WAL. Be sure to close it when you're through.
2153   */
2154  public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri)
2155    throws IOException {
2156    // The WAL subsystem will use the default rootDir rather than the passed in rootDir
2157    // unless I pass along via the conf.
2158    Configuration confForWAL = new Configuration(conf);
2159    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
2160    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
2161  }
2162
2163  /**
2164   * Create a region with it's own WAL. Be sure to call
2165   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2166   */
2167  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2168    final Configuration conf, final TableDescriptor htd) throws IOException {
2169    return createRegionAndWAL(info, rootDir, conf, htd, true);
2170  }
2171
2172  /**
2173   * Create a region with it's own WAL. Be sure to call
2174   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2175   */
2176  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2177    final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException {
2178    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2179    region.setBlockCache(blockCache);
2180    region.initialize();
2181    return region;
2182  }
2183
2184  /**
2185   * Create a region with it's own WAL. Be sure to call
2186   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2187   */
2188  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2189    final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
2190    throws IOException {
2191    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
2192    region.setMobFileCache(mobFileCache);
2193    region.initialize();
2194    return region;
2195  }
2196
2197  /**
2198   * Create a region with it's own WAL. Be sure to call
2199   * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources.
2200   */
2201  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
2202    final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException {
2203    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
2204      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
2205    WAL wal = createWal(conf, rootDir, info);
2206    return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
2207  }
2208
2209  /**
2210   * Find any other region server which is different from the one identified by parameter
2211   * @return another region server
2212   */
2213  public HRegionServer getOtherRegionServer(HRegionServer rs) {
2214    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2215      if (!(rst.getRegionServer() == rs)) {
2216        return rst.getRegionServer();
2217      }
2218    }
2219    return null;
2220  }
2221
2222  /**
2223   * Tool to get the reference to the region server object that holds the region of the specified
2224   * user table.
2225   * @param tableName user table to lookup in hbase:meta
2226   * @return region server that holds it, null if the row doesn't exist
2227   */
2228  public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2229    throws IOException, InterruptedException {
2230    List<RegionInfo> regions = getAdmin().getRegions(tableName);
2231    if (regions == null || regions.isEmpty()) {
2232      return null;
2233    }
2234    LOG.debug("Found " + regions.size() + " regions for table " + tableName);
2235
2236    byte[] firstRegionName =
2237      regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst()
2238        .orElseThrow(() -> new IOException("online regions not found in table " + tableName));
2239
2240    LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName));
2241    long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2242      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2243    int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2244      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2245    RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS);
2246    while (retrier.shouldRetry()) {
2247      int index = getMiniHBaseCluster().getServerWith(firstRegionName);
2248      if (index != -1) {
2249        return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2250      }
2251      // Came back -1. Region may not be online yet. Sleep a while.
2252      retrier.sleepUntilNextRetry();
2253    }
2254    return null;
2255  }
2256
2257  /**
2258   * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s.
2259   * @throws IOException When starting the cluster fails.
2260   */
2261  public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2262    // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing.
2263    conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage",
2264      "99.0");
2265    startMiniMapReduceCluster(2);
2266    return mrCluster;
2267  }
2268
2269  /**
2270   * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its
2271   * internal static LOG_DIR variable.
2272   */
2273  private void forceChangeTaskLogDir() {
2274    Field logDirField;
2275    try {
2276      logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2277      logDirField.setAccessible(true);
2278
2279      Field modifiersField = ReflectionUtils.getModifiersField();
2280      modifiersField.setAccessible(true);
2281      modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2282
2283      logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2284    } catch (SecurityException e) {
2285      throw new RuntimeException(e);
2286    } catch (NoSuchFieldException e) {
2287      throw new RuntimeException(e);
2288    } catch (IllegalArgumentException e) {
2289      throw new RuntimeException(e);
2290    } catch (IllegalAccessException e) {
2291      throw new RuntimeException(e);
2292    }
2293  }
2294
2295  /**
2296   * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2297   * filesystem.
2298   * @param servers The number of <code>TaskTracker</code>'s to start.
2299   * @throws IOException When starting the cluster fails.
2300   */
2301  private void startMiniMapReduceCluster(final int servers) throws IOException {
2302    if (mrCluster != null) {
2303      throw new IllegalStateException("MiniMRCluster is already running");
2304    }
2305    LOG.info("Starting mini mapreduce cluster...");
2306    setupClusterTestDir();
2307    createDirsAndSetProperties();
2308
2309    forceChangeTaskLogDir();
2310
2311    //// hadoop2 specific settings
2312    // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2313    // we up the VM usable so that processes don't get killed.
2314    conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2315
2316    // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2317    // this avoids the problem by disabling speculative task execution in tests.
2318    conf.setBoolean("mapreduce.map.speculative", false);
2319    conf.setBoolean("mapreduce.reduce.speculative", false);
2320    ////
2321
2322    // Yarn container runs in independent JVM. We need to pass the argument manually here if the
2323    // JDK version >= 17. Otherwise, the MiniMRCluster will fail.
2324    if (JVM.getJVMSpecVersion() >= 17) {
2325      String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", "");
2326      conf.set("yarn.app.mapreduce.am.command-opts",
2327        jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED");
2328    }
2329
2330    // Allow the user to override FS URI for this map-reduce cluster to use.
2331    mrCluster =
2332      new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(),
2333        1, null, null, new JobConf(this.conf));
2334    JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2335    if (jobConf == null) {
2336      jobConf = mrCluster.createJobConf();
2337    }
2338
2339    // Hadoop MiniMR overwrites this while it should not
2340    jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir"));
2341    LOG.info("Mini mapreduce cluster started");
2342
2343    // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2344    // Our HBase MR jobs need several of these settings in order to properly run. So we copy the
2345    // necessary config properties here. YARN-129 required adding a few properties.
2346    conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2347    // this for mrv2 support; mr1 ignores this
2348    conf.set("mapreduce.framework.name", "yarn");
2349    conf.setBoolean("yarn.is.minicluster", true);
2350    String rmAddress = jobConf.get("yarn.resourcemanager.address");
2351    if (rmAddress != null) {
2352      conf.set("yarn.resourcemanager.address", rmAddress);
2353    }
2354    String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2355    if (historyAddress != null) {
2356      conf.set("mapreduce.jobhistory.address", historyAddress);
2357    }
2358    String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address");
2359    if (schedulerAddress != null) {
2360      conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2361    }
2362    String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address");
2363    if (mrJobHistoryWebappAddress != null) {
2364      conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress);
2365    }
2366    String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address");
2367    if (yarnRMWebappAddress != null) {
2368      conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress);
2369    }
2370  }
2371
2372  /**
2373   * Stops the previously started <code>MiniMRCluster</code>.
2374   */
2375  public void shutdownMiniMapReduceCluster() {
2376    if (mrCluster != null) {
2377      LOG.info("Stopping mini mapreduce cluster...");
2378      mrCluster.shutdown();
2379      mrCluster = null;
2380      LOG.info("Mini mapreduce cluster stopped");
2381    }
2382    // Restore configuration to point to local jobtracker
2383    conf.set("mapreduce.jobtracker.address", "local");
2384  }
2385
2386  /**
2387   * Create a stubbed out RegionServerService, mainly for getting FS.
2388   */
2389  public RegionServerServices createMockRegionServerService() throws IOException {
2390    return createMockRegionServerService((ServerName) null);
2391  }
2392
2393  /**
2394   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2395   * TestTokenAuthentication
2396   */
2397  public RegionServerServices createMockRegionServerService(RpcServerInterface rpc)
2398    throws IOException {
2399    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2400    rss.setFileSystem(getTestFileSystem());
2401    rss.setRpcServer(rpc);
2402    return rss;
2403  }
2404
2405  /**
2406   * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by
2407   * TestOpenRegionHandler
2408   */
2409  public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2410    final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2411    rss.setFileSystem(getTestFileSystem());
2412    return rss;
2413  }
2414
2415  /**
2416   * Expire the Master's session
2417   */
2418  public void expireMasterSession() throws Exception {
2419    HMaster master = getMiniHBaseCluster().getMaster();
2420    expireSession(master.getZooKeeper(), false);
2421  }
2422
2423  /**
2424   * Expire a region server's session
2425   * @param index which RS
2426   */
2427  public void expireRegionServerSession(int index) throws Exception {
2428    HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2429    expireSession(rs.getZooKeeper(), false);
2430    decrementMinRegionServerCount();
2431  }
2432
2433  private void decrementMinRegionServerCount() {
2434    // decrement the count for this.conf, for newly spwaned master
2435    // this.hbaseCluster shares this configuration too
2436    decrementMinRegionServerCount(getConfiguration());
2437
2438    // each master thread keeps a copy of configuration
2439    for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2440      decrementMinRegionServerCount(master.getMaster().getConfiguration());
2441    }
2442  }
2443
2444  private void decrementMinRegionServerCount(Configuration conf) {
2445    int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2446    if (currentCount != -1) {
2447      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1));
2448    }
2449  }
2450
2451  public void expireSession(ZKWatcher nodeZK) throws Exception {
2452    expireSession(nodeZK, false);
2453  }
2454
2455  /**
2456   * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2457   * http://hbase.apache.org/book.html#trouble.zookeeper
2458   * <p/>
2459   * There are issues when doing this:
2460   * <ol>
2461   * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li>
2462   * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li>
2463   * </ol>
2464   * @param nodeZK      - the ZK watcher to expire
2465   * @param checkStatus - true to check if we can create a Table with the current configuration.
2466   */
2467  public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception {
2468    Configuration c = new Configuration(this.conf);
2469    String quorumServers = ZKConfig.getZKQuorumServersString(c);
2470    ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2471    byte[] password = zk.getSessionPasswd();
2472    long sessionID = zk.getSessionId();
2473
2474    // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2475    // so we create a first watcher to be sure that the
2476    // event was sent. We expect that if our watcher receives the event
2477    // other watchers on the same machine will get is as well.
2478    // When we ask to close the connection, ZK does not close it before
2479    // we receive all the events, so don't have to capture the event, just
2480    // closing the connection should be enough.
2481    ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() {
2482      @Override
2483      public void process(WatchedEvent watchedEvent) {
2484        LOG.info("Monitor ZKW received event=" + watchedEvent);
2485      }
2486    }, sessionID, password);
2487
2488    // Making it expire
2489    ZooKeeper newZK =
2490      new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password);
2491
2492    // ensure that we have connection to the server before closing down, otherwise
2493    // the close session event will be eaten out before we start CONNECTING state
2494    long start = EnvironmentEdgeManager.currentTime();
2495    while (
2496      newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000
2497    ) {
2498      Thread.sleep(1);
2499    }
2500    newZK.close();
2501    LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2502
2503    // Now closing & waiting to be sure that the clients get it.
2504    monitor.close();
2505
2506    if (checkStatus) {
2507      getConnection().getTable(TableName.META_TABLE_NAME).close();
2508    }
2509  }
2510
2511  /**
2512   * Get the Mini HBase cluster.
2513   * @return hbase cluster
2514   * @see #getHBaseClusterInterface()
2515   */
2516  public SingleProcessHBaseCluster getHBaseCluster() {
2517    return getMiniHBaseCluster();
2518  }
2519
2520  /**
2521   * Returns the HBaseCluster instance.
2522   * <p>
2523   * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this
2524   * should not assume that the cluster is a mini cluster or a distributed one. If the test only
2525   * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used
2526   * instead w/o the need to type-cast.
2527   */
2528  public HBaseClusterInterface getHBaseClusterInterface() {
2529    // implementation note: we should rename this method as #getHBaseCluster(),
2530    // but this would require refactoring 90+ calls.
2531    return hbaseCluster;
2532  }
2533
2534  /**
2535   * Resets the connections so that the next time getConnection() is called, a new connection is
2536   * created. This is needed in cases where the entire cluster / all the masters are shutdown and
2537   * the connection is not valid anymore.
2538   * <p/>
2539   * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are
2540   * written, not all start() stop() calls go through this class. Most tests directly operate on the
2541   * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain
2542   * the connection state automatically. Cleaning this is a much bigger refactor.
2543   */
2544  public void invalidateConnection() throws IOException {
2545    closeConnection();
2546    // Update the master addresses if they changed.
2547    final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY);
2548    final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY);
2549    LOG.info("Invalidated connection. Updating master addresses before: {} after: {}",
2550      masterConfigBefore, masterConfAfter);
2551    conf.set(HConstants.MASTER_ADDRS_KEY,
2552      getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY));
2553  }
2554
2555  /**
2556   * Get a shared Connection to the cluster. this method is thread safe.
2557   * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2558   */
2559  public Connection getConnection() throws IOException {
2560    return getAsyncConnection().toConnection();
2561  }
2562
2563  /**
2564   * Get a assigned Connection to the cluster. this method is thread safe.
2565   * @param user assigned user
2566   * @return A Connection with assigned user.
2567   */
2568  public Connection getConnection(User user) throws IOException {
2569    return getAsyncConnection(user).toConnection();
2570  }
2571
2572  /**
2573   * Get a shared AsyncClusterConnection to the cluster. this method is thread safe.
2574   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown
2575   *         of cluster.
2576   */
2577  public AsyncClusterConnection getAsyncConnection() throws IOException {
2578    try {
2579      return asyncConnection.updateAndGet(connection -> {
2580        if (connection == null) {
2581          try {
2582            User user = UserProvider.instantiate(conf).getCurrent();
2583            connection = getAsyncConnection(user);
2584          } catch (IOException ioe) {
2585            throw new UncheckedIOException("Failed to create connection", ioe);
2586          }
2587        }
2588        return connection;
2589      });
2590    } catch (UncheckedIOException exception) {
2591      throw exception.getCause();
2592    }
2593  }
2594
2595  /**
2596   * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe.
2597   * @param user assigned user
2598   * @return An AsyncClusterConnection with assigned user.
2599   */
2600  public AsyncClusterConnection getAsyncConnection(User user) throws IOException {
2601    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
2602  }
2603
2604  public void closeConnection() throws IOException {
2605    if (hbaseAdmin != null) {
2606      Closeables.close(hbaseAdmin, true);
2607      hbaseAdmin = null;
2608    }
2609    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
2610    if (asyncConnection != null) {
2611      Closeables.close(asyncConnection, true);
2612    }
2613  }
2614
2615  /**
2616   * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing
2617   * it has no effect, it will be closed automatically when the cluster shutdowns
2618   */
2619  public Admin getAdmin() throws IOException {
2620    if (hbaseAdmin == null) {
2621      this.hbaseAdmin = getConnection().getAdmin();
2622    }
2623    return hbaseAdmin;
2624  }
2625
2626  private Admin hbaseAdmin = null;
2627
2628  /**
2629   * Returns an {@link Hbck} instance. Needs be closed when done.
2630   */
2631  public Hbck getHbck() throws IOException {
2632    return getConnection().getHbck();
2633  }
2634
2635  /**
2636   * Unassign the named region.
2637   * @param regionName The region to unassign.
2638   */
2639  public void unassignRegion(String regionName) throws IOException {
2640    unassignRegion(Bytes.toBytes(regionName));
2641  }
2642
2643  /**
2644   * Unassign the named region.
2645   * @param regionName The region to unassign.
2646   */
2647  public void unassignRegion(byte[] regionName) throws IOException {
2648    getAdmin().unassign(regionName);
2649  }
2650
2651  /**
2652   * Closes the region containing the given row.
2653   * @param row   The row to find the containing region.
2654   * @param table The table to find the region.
2655   */
2656  public void unassignRegionByRow(String row, RegionLocator table) throws IOException {
2657    unassignRegionByRow(Bytes.toBytes(row), table);
2658  }
2659
2660  /**
2661   * Closes the region containing the given row.
2662   * @param row   The row to find the containing region.
2663   * @param table The table to find the region.
2664   */
2665  public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException {
2666    HRegionLocation hrl = table.getRegionLocation(row);
2667    unassignRegion(hrl.getRegion().getRegionName());
2668  }
2669
2670  /**
2671   * Retrieves a splittable region randomly from tableName
2672   * @param tableName   name of table
2673   * @param maxAttempts maximum number of attempts, unlimited for value of -1
2674   * @return the HRegion chosen, null if none was found within limit of maxAttempts
2675   */
2676  public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
2677    List<HRegion> regions = getHBaseCluster().getRegions(tableName);
2678    int regCount = regions.size();
2679    Set<Integer> attempted = new HashSet<>();
2680    int idx;
2681    int attempts = 0;
2682    do {
2683      regions = getHBaseCluster().getRegions(tableName);
2684      if (regCount != regions.size()) {
2685        // if there was region movement, clear attempted Set
2686        attempted.clear();
2687      }
2688      regCount = regions.size();
2689      // There are chances that before we get the region for the table from an RS the region may
2690      // be going for CLOSE. This may be because online schema change is enabled
2691      if (regCount > 0) {
2692        idx = ThreadLocalRandom.current().nextInt(regCount);
2693        // if we have just tried this region, there is no need to try again
2694        if (attempted.contains(idx)) {
2695          continue;
2696        }
2697        HRegion region = regions.get(idx);
2698        if (region.checkSplit().isPresent()) {
2699          return region;
2700        }
2701        attempted.add(idx);
2702      }
2703      attempts++;
2704    } while (maxAttempts == -1 || attempts < maxAttempts);
2705    return null;
2706  }
2707
2708  public MiniDFSCluster getDFSCluster() {
2709    return dfsCluster;
2710  }
2711
2712  public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
2713    setDFSCluster(cluster, true);
2714  }
2715
2716  /**
2717   * Set the MiniDFSCluster
2718   * @param cluster     cluster to use
2719   * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it
2720   *                    is set.
2721   * @throws IllegalStateException if the passed cluster is up when it is required to be down
2722   * @throws IOException           if the FileSystem could not be set from the passed dfs cluster
2723   */
2724  public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
2725    throws IllegalStateException, IOException {
2726    if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
2727      throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
2728    }
2729    this.dfsCluster = cluster;
2730    this.setFs();
2731  }
2732
2733  public FileSystem getTestFileSystem() throws IOException {
2734    return HFileSystem.get(conf);
2735  }
2736
2737  /**
2738   * Wait until all regions in a table have been assigned. Waits default timeout before giving up
2739   * (30 seconds).
2740   * @param table Table to wait on.
2741   */
2742  public void waitTableAvailable(TableName table) throws InterruptedException, IOException {
2743    waitTableAvailable(table.getName(), 30000);
2744  }
2745
2746  public void waitTableAvailable(TableName table, long timeoutMillis)
2747    throws InterruptedException, IOException {
2748    waitFor(timeoutMillis, predicateTableAvailable(table));
2749  }
2750
2751  /**
2752   * Wait until all regions in a table have been assigned
2753   * @param table         Table to wait on.
2754   * @param timeoutMillis Timeout.
2755   */
2756  public void waitTableAvailable(byte[] table, long timeoutMillis)
2757    throws InterruptedException, IOException {
2758    waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table)));
2759  }
2760
2761  public String explainTableAvailability(TableName tableName) throws IOException {
2762    StringBuilder msg =
2763      new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", ");
2764    if (getHBaseCluster().getMaster().isAlive()) {
2765      Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
2766        .getRegionStates().getRegionAssignments();
2767      final List<Pair<RegionInfo, ServerName>> metaLocations =
2768        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
2769      for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
2770        RegionInfo hri = metaLocation.getFirst();
2771        ServerName sn = metaLocation.getSecond();
2772        if (!assignments.containsKey(hri)) {
2773          msg.append(", region ").append(hri)
2774            .append(" not assigned, but found in meta, it expected to be on ").append(sn);
2775        } else if (sn == null) {
2776          msg.append(",  region ").append(hri).append(" assigned,  but has no server in meta");
2777        } else if (!sn.equals(assignments.get(hri))) {
2778          msg.append(",  region ").append(hri)
2779            .append(" assigned,  but has different servers in meta and AM ( ").append(sn)
2780            .append(" <> ").append(assignments.get(hri));
2781        }
2782      }
2783    }
2784    return msg.toString();
2785  }
2786
2787  public String explainTableState(final TableName table, TableState.State state)
2788    throws IOException {
2789    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
2790    if (tableState == null) {
2791      return "TableState in META: No table state in META for table " + table
2792        + " last state in meta (including deleted is " + findLastTableState(table) + ")";
2793    } else if (!tableState.inStates(state)) {
2794      return "TableState in META: Not " + state + " state, but " + tableState;
2795    } else {
2796      return "TableState in META: OK";
2797    }
2798  }
2799
2800  @Nullable
2801  public TableState findLastTableState(final TableName table) throws IOException {
2802    final AtomicReference<TableState> lastTableState = new AtomicReference<>(null);
2803    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
2804      @Override
2805      public boolean visit(Result r) throws IOException {
2806        if (!Arrays.equals(r.getRow(), table.getName())) {
2807          return false;
2808        }
2809        TableState state = CatalogFamilyFormat.getTableState(r);
2810        if (state != null) {
2811          lastTableState.set(state);
2812        }
2813        return true;
2814      }
2815    };
2816    MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE,
2817      Integer.MAX_VALUE, visitor);
2818    return lastTableState.get();
2819  }
2820
2821  /**
2822   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2823   * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent
2824   * table.
2825   * @param table the table to wait on.
2826   * @throws InterruptedException if interrupted while waiting
2827   * @throws IOException          if an IO problem is encountered
2828   */
2829  public void waitTableEnabled(TableName table) throws InterruptedException, IOException {
2830    waitTableEnabled(table, 30000);
2831  }
2832
2833  /**
2834   * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions
2835   * have been all assigned.
2836   * @see #waitTableEnabled(TableName, long)
2837   * @param table         Table to wait on.
2838   * @param timeoutMillis Time to wait on it being marked enabled.
2839   */
2840  public void waitTableEnabled(byte[] table, long timeoutMillis)
2841    throws InterruptedException, IOException {
2842    waitTableEnabled(TableName.valueOf(table), timeoutMillis);
2843  }
2844
2845  public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException {
2846    waitFor(timeoutMillis, predicateTableEnabled(table));
2847  }
2848
2849  /**
2850   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout
2851   * after default period (30 seconds)
2852   * @param table Table to wait on.
2853   */
2854  public void waitTableDisabled(byte[] table) throws InterruptedException, IOException {
2855    waitTableDisabled(table, 30000);
2856  }
2857
2858  public void waitTableDisabled(TableName table, long millisTimeout)
2859    throws InterruptedException, IOException {
2860    waitFor(millisTimeout, predicateTableDisabled(table));
2861  }
2862
2863  /**
2864   * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled'
2865   * @param table         Table to wait on.
2866   * @param timeoutMillis Time to wait on it being marked disabled.
2867   */
2868  public void waitTableDisabled(byte[] table, long timeoutMillis)
2869    throws InterruptedException, IOException {
2870    waitTableDisabled(TableName.valueOf(table), timeoutMillis);
2871  }
2872
2873  /**
2874   * Make sure that at least the specified number of region servers are running
2875   * @param num minimum number of region servers that should be running
2876   * @return true if we started some servers
2877   */
2878  public boolean ensureSomeRegionServersAvailable(final int num) throws IOException {
2879    boolean startedServer = false;
2880    SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster();
2881    for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) {
2882      LOG.info("Started new server=" + hbaseCluster.startRegionServer());
2883      startedServer = true;
2884    }
2885
2886    return startedServer;
2887  }
2888
2889  /**
2890   * Make sure that at least the specified number of region servers are running. We don't count the
2891   * ones that are currently stopping or are stopped.
2892   * @param num minimum number of region servers that should be running
2893   * @return true if we started some servers
2894   */
2895  public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException {
2896    boolean startedServer = ensureSomeRegionServersAvailable(num);
2897
2898    int nonStoppedServers = 0;
2899    for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) {
2900
2901      HRegionServer hrs = rst.getRegionServer();
2902      if (hrs.isStopping() || hrs.isStopped()) {
2903        LOG.info("A region server is stopped or stopping:" + hrs);
2904      } else {
2905        nonStoppedServers++;
2906      }
2907    }
2908    for (int i = nonStoppedServers; i < num; ++i) {
2909      LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
2910      startedServer = true;
2911    }
2912    return startedServer;
2913  }
2914
2915  /**
2916   * This method clones the passed <code>c</code> configuration setting a new user into the clone.
2917   * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos.
2918   * @param c                     Initial configuration
2919   * @param differentiatingSuffix Suffix to differentiate this user from others.
2920   * @return A new configuration instance with a different user set into it.
2921   */
2922  public static User getDifferentUser(final Configuration c, final String differentiatingSuffix)
2923    throws IOException {
2924    FileSystem currentfs = FileSystem.get(c);
2925    if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) {
2926      return User.getCurrent();
2927    }
2928    // Else distributed filesystem. Make a new instance per daemon. Below
2929    // code is taken from the AppendTestUtil over in hdfs.
2930    String username = User.getCurrent().getName() + differentiatingSuffix;
2931    User user = User.createUserForTesting(c, username, new String[] { "supergroup" });
2932    return user;
2933  }
2934
2935  public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster)
2936    throws IOException {
2937    NavigableSet<String> online = new TreeSet<>();
2938    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
2939      try {
2940        for (RegionInfo region : ProtobufUtil
2941          .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
2942          online.add(region.getRegionNameAsString());
2943        }
2944      } catch (RegionServerStoppedException e) {
2945        // That's fine.
2946      }
2947    }
2948    return online;
2949  }
2950
2951  /**
2952   * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests
2953   * linger. Here is the exception you'll see:
2954   *
2955   * <pre>
2956   * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block
2957   * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block
2958   * blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683
2959   * failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
2960   * </pre>
2961   *
2962   * @param stream A DFSClient.DFSOutputStream.
2963   */
2964  public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) {
2965    try {
2966      Class<?>[] clazzes = DFSClient.class.getDeclaredClasses();
2967      for (Class<?> clazz : clazzes) {
2968        String className = clazz.getSimpleName();
2969        if (className.equals("DFSOutputStream")) {
2970          if (clazz.isInstance(stream)) {
2971            Field maxRecoveryErrorCountField =
2972              stream.getClass().getDeclaredField("maxRecoveryErrorCount");
2973            maxRecoveryErrorCountField.setAccessible(true);
2974            maxRecoveryErrorCountField.setInt(stream, max);
2975            break;
2976          }
2977        }
2978      }
2979    } catch (Exception e) {
2980      LOG.info("Could not set max recovery field", e);
2981    }
2982  }
2983
2984  /**
2985   * Uses directly the assignment manager to assign the region. and waits until the specified region
2986   * has completed assignment.
2987   * @return true if the region is assigned false otherwise.
2988   */
2989  public boolean assignRegion(final RegionInfo regionInfo)
2990    throws IOException, InterruptedException {
2991    final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager();
2992    am.assign(regionInfo);
2993    return AssignmentTestingUtil.waitForAssignment(am, regionInfo);
2994  }
2995
2996  /**
2997   * Move region to destination server and wait till region is completely moved and online
2998   * @param destRegion region to move
2999   * @param destServer destination server of the region
3000   */
3001  public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer)
3002    throws InterruptedException, IOException {
3003    HMaster master = getMiniHBaseCluster().getMaster();
3004    // TODO: Here we start the move. The move can take a while.
3005    getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer);
3006    while (true) {
3007      ServerName serverName =
3008        master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion);
3009      if (serverName != null && serverName.equals(destServer)) {
3010        assertRegionOnServer(destRegion, serverName, 2000);
3011        break;
3012      }
3013      Thread.sleep(10);
3014    }
3015  }
3016
3017  /**
3018   * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a
3019   * configuable timeout value (default is 60 seconds) This means all regions have been deployed,
3020   * master has been informed and updated hbase:meta with the regions deployed server.
3021   * @param tableName the table name
3022   */
3023  public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3024    waitUntilAllRegionsAssigned(tableName,
3025      this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3026  }
3027
3028  /**
3029   * Waith until all system table's regions get assigned
3030   */
3031  public void waitUntilAllSystemRegionsAssigned() throws IOException {
3032    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3033  }
3034
3035  /**
3036   * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until
3037   * timeout. This means all regions have been deployed, master has been informed and updated
3038   * hbase:meta with the regions deployed server.
3039   * @param tableName the table name
3040   * @param timeout   timeout, in milliseconds
3041   */
3042  public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3043    throws IOException {
3044    if (!TableName.isMetaTableName(tableName)) {
3045      try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) {
3046        LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = "
3047          + timeout + "ms");
3048        waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() {
3049          @Override
3050          public String explainFailure() throws IOException {
3051            return explainTableAvailability(tableName);
3052          }
3053
3054          @Override
3055          public boolean evaluate() throws IOException {
3056            Scan scan = new Scan();
3057            scan.addFamily(HConstants.CATALOG_FAMILY);
3058            boolean tableFound = false;
3059            try (ResultScanner s = meta.getScanner(scan)) {
3060              for (Result r; (r = s.next()) != null;) {
3061                byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3062                RegionInfo info = RegionInfo.parseFromOrNull(b);
3063                if (info != null && info.getTable().equals(tableName)) {
3064                  // Get server hosting this region from catalog family. Return false if no server
3065                  // hosting this region, or if the server hosting this region was recently killed
3066                  // (for fault tolerance testing).
3067                  tableFound = true;
3068                  byte[] server =
3069                    r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3070                  if (server == null) {
3071                    return false;
3072                  } else {
3073                    byte[] startCode =
3074                      r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER);
3075                    ServerName serverName =
3076                      ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + ","
3077                        + Bytes.toLong(startCode));
3078                    if (
3079                      !getHBaseClusterInterface().isDistributedCluster()
3080                        && getHBaseCluster().isKilledRS(serverName)
3081                    ) {
3082                      return false;
3083                    }
3084                  }
3085                  if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) {
3086                    return false;
3087                  }
3088                }
3089              }
3090            }
3091            if (!tableFound) {
3092              LOG.warn(
3093                "Didn't find the entries for table " + tableName + " in meta, already deleted?");
3094            }
3095            return tableFound;
3096          }
3097        });
3098      }
3099    }
3100    LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states.");
3101    // check from the master state if we are using a mini cluster
3102    if (!getHBaseClusterInterface().isDistributedCluster()) {
3103      // So, all regions are in the meta table but make sure master knows of the assignments before
3104      // returning -- sometimes this can lag.
3105      HMaster master = getHBaseCluster().getMaster();
3106      final RegionStates states = master.getAssignmentManager().getRegionStates();
3107      waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3108        @Override
3109        public String explainFailure() throws IOException {
3110          return explainTableAvailability(tableName);
3111        }
3112
3113        @Override
3114        public boolean evaluate() throws IOException {
3115          List<RegionInfo> hris = states.getRegionsOfTable(tableName);
3116          return hris != null && !hris.isEmpty();
3117        }
3118      });
3119    }
3120    LOG.info("All regions for table " + tableName + " assigned.");
3121  }
3122
3123  /**
3124   * Do a small get/scan against one store. This is required because store has no actual methods of
3125   * querying itself, and relies on StoreScanner.
3126   */
3127  public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException {
3128    Scan scan = new Scan(get);
3129    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3130      scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()),
3131      // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set
3132      // readpoint 0.
3133      0);
3134
3135    List<Cell> result = new ArrayList<>();
3136    scanner.next(result);
3137    if (!result.isEmpty()) {
3138      // verify that we are on the row we want:
3139      Cell kv = result.get(0);
3140      if (!CellUtil.matchingRows(kv, get.getRow())) {
3141        result.clear();
3142      }
3143    }
3144    scanner.close();
3145    return result;
3146  }
3147
3148  /**
3149   * Create region split keys between startkey and endKey
3150   * @param numRegions the number of regions to be created. it has to be greater than 3.
3151   * @return resulting split keys
3152   */
3153  public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) {
3154    assertTrue(numRegions > 3);
3155    byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3156    byte[][] result = new byte[tmpSplitKeys.length + 1][];
3157    System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3158    result[0] = HConstants.EMPTY_BYTE_ARRAY;
3159    return result;
3160  }
3161
3162  /**
3163   * Do a small get/scan against one store. This is required because store has no actual methods of
3164   * querying itself, and relies on StoreScanner.
3165   */
3166  public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns)
3167    throws IOException {
3168    Get get = new Get(row);
3169    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3170    s.put(store.getColumnFamilyDescriptor().getName(), columns);
3171
3172    return getFromStoreFile(store, get);
3173  }
3174
3175  public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
3176    final List<? extends Cell> actual) {
3177    final int eLen = expected.size();
3178    final int aLen = actual.size();
3179    final int minLen = Math.min(eLen, aLen);
3180
3181    int i = 0;
3182    while (
3183      i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0
3184    ) {
3185      i++;
3186    }
3187
3188    if (additionalMsg == null) {
3189      additionalMsg = "";
3190    }
3191    if (!additionalMsg.isEmpty()) {
3192      additionalMsg = ". " + additionalMsg;
3193    }
3194
3195    if (eLen != aLen || i != minLen) {
3196      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
3197        + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i)
3198        + " (length " + aLen + ")" + additionalMsg);
3199    }
3200  }
3201
3202  public static <T> String safeGetAsStr(List<T> lst, int i) {
3203    if (0 <= i && i < lst.size()) {
3204      return lst.get(i).toString();
3205    } else {
3206      return "<out_of_range>";
3207    }
3208  }
3209
3210  public String getRpcConnnectionURI() throws UnknownHostException {
3211    return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf);
3212  }
3213
3214  public String getZkConnectionURI() {
3215    return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3216      + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3217      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3218  }
3219
3220  /**
3221   * Get the zk based cluster key for this cluster.
3222   * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the
3223   *             connection info of a cluster. Keep here only for compatibility.
3224   * @see #getRpcConnnectionURI()
3225   * @see #getZkConnectionURI()
3226   */
3227  @Deprecated
3228  public String getClusterKey() {
3229    return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
3230      + ":"
3231      + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3232  }
3233
3234  /**
3235   * Creates a random table with the given parameters
3236   */
3237  public Table createRandomTable(TableName tableName, final Collection<String> families,
3238    final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions,
3239    final int numRowsPerFlush) throws IOException, InterruptedException {
3240    LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, "
3241      + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions="
3242      + maxVersions + "\n");
3243
3244    final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3245    final int numCF = families.size();
3246    final byte[][] cfBytes = new byte[numCF][];
3247    {
3248      int cfIndex = 0;
3249      for (String cf : families) {
3250        cfBytes[cfIndex++] = Bytes.toBytes(cf);
3251      }
3252    }
3253
3254    final int actualStartKey = 0;
3255    final int actualEndKey = Integer.MAX_VALUE;
3256    final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3257    final int splitStartKey = actualStartKey + keysPerRegion;
3258    final int splitEndKey = actualEndKey - keysPerRegion;
3259    final String keyFormat = "%08x";
3260    final Table table = createTable(tableName, cfBytes, maxVersions,
3261      Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3262      Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions);
3263
3264    if (hbaseCluster != null) {
3265      getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3266    }
3267
3268    BufferedMutator mutator = getConnection().getBufferedMutator(tableName);
3269
3270    for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3271      for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3272        final byte[] row = Bytes.toBytes(
3273          String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3274
3275        Put put = new Put(row);
3276        Delete del = new Delete(row);
3277        for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3278          final byte[] cf = cfBytes[rand.nextInt(numCF)];
3279          final long ts = rand.nextInt();
3280          final byte[] qual = Bytes.toBytes("col" + iCol);
3281          if (rand.nextBoolean()) {
3282            final byte[] value =
3283              Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_"
3284                + iCol + "_ts_" + ts + "_random_" + rand.nextLong());
3285            put.addColumn(cf, qual, ts, value);
3286          } else if (rand.nextDouble() < 0.8) {
3287            del.addColumn(cf, qual, ts);
3288          } else {
3289            del.addColumns(cf, qual, ts);
3290          }
3291        }
3292
3293        if (!put.isEmpty()) {
3294          mutator.mutate(put);
3295        }
3296
3297        if (!del.isEmpty()) {
3298          mutator.mutate(del);
3299        }
3300      }
3301      LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3302      mutator.flush();
3303      if (hbaseCluster != null) {
3304        getMiniHBaseCluster().flushcache(table.getName());
3305      }
3306    }
3307    mutator.close();
3308
3309    return table;
3310  }
3311
3312  public static int randomFreePort() {
3313    return HBaseCommonTestingUtil.randomFreePort();
3314  }
3315
3316  public static String randomMultiCastAddress() {
3317    return "226.1.1." + ThreadLocalRandom.current().nextInt(254);
3318  }
3319
3320  public static void waitForHostPort(String host, int port) throws IOException {
3321    final int maxTimeMs = 10000;
3322    final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3323    IOException savedException = null;
3324    LOG.info("Waiting for server at " + host + ":" + port);
3325    for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3326      try {
3327        Socket sock = new Socket(InetAddress.getByName(host), port);
3328        sock.close();
3329        savedException = null;
3330        LOG.info("Server at " + host + ":" + port + " is available");
3331        break;
3332      } catch (UnknownHostException e) {
3333        throw new IOException("Failed to look up " + host, e);
3334      } catch (IOException e) {
3335        savedException = e;
3336      }
3337      Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3338    }
3339
3340    if (savedException != null) {
3341      throw savedException;
3342    }
3343  }
3344
3345  public static int getMetaRSPort(Connection connection) throws IOException {
3346    try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
3347      return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3348    }
3349  }
3350
3351  /**
3352   * Due to async racing issue, a region may not be in the online region list of a region server
3353   * yet, after the assignment znode is deleted and the new assignment is recorded in master.
3354   */
3355  public void assertRegionOnServer(final RegionInfo hri, final ServerName server,
3356    final long timeout) throws IOException, InterruptedException {
3357    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3358    while (true) {
3359      List<RegionInfo> regions = getAdmin().getRegions(server);
3360      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return;
3361      long now = EnvironmentEdgeManager.currentTime();
3362      if (now > timeoutTime) break;
3363      Thread.sleep(10);
3364    }
3365    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3366  }
3367
3368  /**
3369   * Check to make sure the region is open on the specified region server, but not on any other one.
3370   */
3371  public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server,
3372    final long timeout) throws IOException, InterruptedException {
3373    long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout;
3374    while (true) {
3375      List<RegionInfo> regions = getAdmin().getRegions(server);
3376      if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) {
3377        List<JVMClusterUtil.RegionServerThread> rsThreads =
3378          getHBaseCluster().getLiveRegionServerThreads();
3379        for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) {
3380          HRegionServer rs = rsThread.getRegionServer();
3381          if (server.equals(rs.getServerName())) {
3382            continue;
3383          }
3384          Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext();
3385          for (HRegion r : hrs) {
3386            assertTrue("Region should not be double assigned",
3387              r.getRegionInfo().getRegionId() != hri.getRegionId());
3388          }
3389        }
3390        return; // good, we are happy
3391      }
3392      long now = EnvironmentEdgeManager.currentTime();
3393      if (now > timeoutTime) break;
3394      Thread.sleep(10);
3395    }
3396    fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server);
3397  }
3398
3399  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
3400    TableDescriptor td =
3401      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3402    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3403    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
3404  }
3405
3406  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
3407    BlockCache blockCache) throws IOException {
3408    TableDescriptor td =
3409      TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
3410    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
3411    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
3412  }
3413
3414  public static void setFileSystemURI(String fsURI) {
3415    FS_URI = fsURI;
3416  }
3417
3418  /**
3419   * Returns a {@link Predicate} for checking that there are no regions in transition in master
3420   */
3421  public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
3422    return new ExplainingPredicate<IOException>() {
3423      @Override
3424      public String explainFailure() throws IOException {
3425        final RegionStates regionStates =
3426          getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
3427        return "found in transition: " + regionStates.getRegionsInTransition().toString();
3428      }
3429
3430      @Override
3431      public boolean evaluate() throws IOException {
3432        HMaster master = getMiniHBaseCluster().getMaster();
3433        if (master == null) return false;
3434        AssignmentManager am = master.getAssignmentManager();
3435        if (am == null) return false;
3436        return !am.hasRegionsInTransition();
3437      }
3438    };
3439  }
3440
3441  /**
3442   * Returns a {@link Predicate} for checking that table is enabled
3443   */
3444  public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
3445    return new ExplainingPredicate<IOException>() {
3446      @Override
3447      public String explainFailure() throws IOException {
3448        return explainTableState(tableName, TableState.State.ENABLED);
3449      }
3450
3451      @Override
3452      public boolean evaluate() throws IOException {
3453        return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName);
3454      }
3455    };
3456  }
3457
3458  /**
3459   * Returns a {@link Predicate} for checking that table is enabled
3460   */
3461  public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
3462    return new ExplainingPredicate<IOException>() {
3463      @Override
3464      public String explainFailure() throws IOException {
3465        return explainTableState(tableName, TableState.State.DISABLED);
3466      }
3467
3468      @Override
3469      public boolean evaluate() throws IOException {
3470        return getAdmin().isTableDisabled(tableName);
3471      }
3472    };
3473  }
3474
3475  /**
3476   * Returns a {@link Predicate} for checking that table is enabled
3477   */
3478  public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
3479    return new ExplainingPredicate<IOException>() {
3480      @Override
3481      public String explainFailure() throws IOException {
3482        return explainTableAvailability(tableName);
3483      }
3484
3485      @Override
3486      public boolean evaluate() throws IOException {
3487        boolean tableAvailable = getAdmin().isTableAvailable(tableName);
3488        if (tableAvailable) {
3489          try (Table table = getConnection().getTable(tableName)) {
3490            TableDescriptor htd = table.getDescriptor();
3491            for (HRegionLocation loc : getConnection().getRegionLocator(tableName)
3492              .getAllRegionLocations()) {
3493              Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey())
3494                .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit()
3495                .setMaxResultsPerColumnFamily(1).setCacheBlocks(false);
3496              for (byte[] family : htd.getColumnFamilyNames()) {
3497                scan.addFamily(family);
3498              }
3499              try (ResultScanner scanner = table.getScanner(scan)) {
3500                scanner.next();
3501              }
3502            }
3503          }
3504        }
3505        return tableAvailable;
3506      }
3507    };
3508  }
3509
3510  /**
3511   * Wait until no regions in transition.
3512   * @param timeout How long to wait.
3513   */
3514  public void waitUntilNoRegionsInTransition(final long timeout) throws IOException {
3515    waitFor(timeout, predicateNoRegionsInTransition());
3516  }
3517
3518  /**
3519   * Wait until no regions in transition. (time limit 15min)
3520   */
3521  public void waitUntilNoRegionsInTransition() throws IOException {
3522    waitUntilNoRegionsInTransition(15 * 60000);
3523  }
3524
3525  /**
3526   * Wait until labels is ready in VisibilityLabelsCache.
3527   */
3528  public void waitLabelAvailable(long timeoutMillis, final String... labels) {
3529    final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
3530    waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
3531
3532      @Override
3533      public boolean evaluate() {
3534        for (String label : labels) {
3535          if (labelsCache.getLabelOrdinal(label) == 0) {
3536            return false;
3537          }
3538        }
3539        return true;
3540      }
3541
3542      @Override
3543      public String explainFailure() {
3544        for (String label : labels) {
3545          if (labelsCache.getLabelOrdinal(label) == 0) {
3546            return label + " is not available yet";
3547          }
3548        }
3549        return "";
3550      }
3551    });
3552  }
3553
3554  /**
3555   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3556   * available.
3557   * @return the list of column descriptors
3558   */
3559  public static List<ColumnFamilyDescriptor> generateColumnDescriptors() {
3560    return generateColumnDescriptors("");
3561  }
3562
3563  /**
3564   * Create a set of column descriptors with the combination of compression, encoding, bloom codecs
3565   * available.
3566   * @param prefix family names prefix
3567   * @return the list of column descriptors
3568   */
3569  public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) {
3570    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
3571    long familyId = 0;
3572    for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) {
3573      for (DataBlockEncoding encodingType : DataBlockEncoding.values()) {
3574        for (BloomType bloomType : BloomType.values()) {
3575          String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
3576          ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
3577            ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name));
3578          columnFamilyDescriptorBuilder.setCompressionType(compressionType);
3579          columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType);
3580          columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
3581          columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build());
3582          familyId++;
3583        }
3584      }
3585    }
3586    return columnFamilyDescriptors;
3587  }
3588
3589  /**
3590   * Get supported compression algorithms.
3591   * @return supported compression algorithms.
3592   */
3593  public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
3594    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
3595    List<Compression.Algorithm> supportedAlgos = new ArrayList<>();
3596    for (String algoName : allAlgos) {
3597      try {
3598        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
3599        algo.getCompressor();
3600        supportedAlgos.add(algo);
3601      } catch (Throwable t) {
3602        // this algo is not available
3603      }
3604    }
3605    return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
3606  }
3607
3608  public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException {
3609    Scan scan = new Scan().withStartRow(row);
3610    scan.setReadType(ReadType.PREAD);
3611    scan.setCaching(1);
3612    scan.setReversed(true);
3613    scan.addFamily(family);
3614    try (RegionScanner scanner = r.getScanner(scan)) {
3615      List<Cell> cells = new ArrayList<>(1);
3616      scanner.next(cells);
3617      if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) {
3618        return null;
3619      }
3620      return Result.create(cells);
3621    }
3622  }
3623
3624  private boolean isTargetTable(final byte[] inRow, Cell c) {
3625    String inputRowString = Bytes.toString(inRow);
3626    int i = inputRowString.indexOf(HConstants.DELIMITER);
3627    String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength());
3628    int o = outputRowString.indexOf(HConstants.DELIMITER);
3629    return inputRowString.substring(0, i).equals(outputRowString.substring(0, o));
3630  }
3631
3632  /**
3633   * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given
3634   * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use
3635   * kerby KDC server and utility for using it,
3636   * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred;
3637   * less baggage. It came in in HBASE-5291.
3638   */
3639  public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
3640    Properties conf = MiniKdc.createConf();
3641    conf.put(MiniKdc.DEBUG, true);
3642    MiniKdc kdc = null;
3643    File dir = null;
3644    // There is time lag between selecting a port and trying to bind with it. It's possible that
3645    // another service captures the port in between which'll result in BindException.
3646    boolean bindException;
3647    int numTries = 0;
3648    do {
3649      try {
3650        bindException = false;
3651        dir = new File(getDataTestDir("kdc").toUri().getPath());
3652        kdc = new MiniKdc(conf, dir);
3653        kdc.start();
3654      } catch (BindException e) {
3655        FileUtils.deleteDirectory(dir); // clean directory
3656        numTries++;
3657        if (numTries == 3) {
3658          LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
3659          throw e;
3660        }
3661        LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
3662        bindException = true;
3663      }
3664    } while (bindException);
3665    HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
3666    return kdc;
3667  }
3668
3669  public int getNumHFiles(final TableName tableName, final byte[] family) {
3670    int numHFiles = 0;
3671    for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
3672      numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family);
3673    }
3674    return numHFiles;
3675  }
3676
3677  public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
3678    final byte[] family) {
3679    int numHFiles = 0;
3680    for (Region region : rs.getRegions(tableName)) {
3681      numHFiles += region.getStore(family).getStorefilesCount();
3682    }
3683    return numHFiles;
3684  }
3685
3686  public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) {
3687    assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode());
3688    Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies());
3689    Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies());
3690    assertEquals(ltdFamilies.size(), rtdFamilies.size());
3691    for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(),
3692        it2 = rtdFamilies.iterator(); it.hasNext();) {
3693      assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
3694    }
3695  }
3696
3697  /**
3698   * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
3699   * invocations.
3700   */
3701  public static void await(final long sleepMillis, final BooleanSupplier condition)
3702    throws InterruptedException {
3703    try {
3704      while (!condition.getAsBoolean()) {
3705        Thread.sleep(sleepMillis);
3706      }
3707    } catch (RuntimeException e) {
3708      if (e.getCause() instanceof AssertionError) {
3709        throw (AssertionError) e.getCause();
3710      }
3711      throw e;
3712    }
3713  }
3714}