001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import edu.umd.cs.findbugs.annotations.Nullable; 025import java.io.File; 026import java.io.IOException; 027import java.io.OutputStream; 028import java.io.UncheckedIOException; 029import java.lang.reflect.Field; 030import java.lang.reflect.Modifier; 031import java.net.BindException; 032import java.net.DatagramSocket; 033import java.net.InetAddress; 034import java.net.ServerSocket; 035import java.net.Socket; 036import java.net.UnknownHostException; 037import java.nio.charset.StandardCharsets; 038import java.security.MessageDigest; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Map; 047import java.util.NavigableSet; 048import java.util.Properties; 049import java.util.Random; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.concurrent.ExecutionException; 053import java.util.concurrent.ThreadLocalRandom; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicReference; 056import java.util.function.BooleanSupplier; 057import org.apache.commons.io.FileUtils; 058import org.apache.commons.lang3.RandomStringUtils; 059import org.apache.hadoop.conf.Configuration; 060import org.apache.hadoop.fs.FileSystem; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 063import org.apache.hadoop.hbase.Waiter.Predicate; 064import org.apache.hadoop.hbase.client.Admin; 065import org.apache.hadoop.hbase.client.AsyncAdmin; 066import org.apache.hadoop.hbase.client.AsyncClusterConnection; 067import org.apache.hadoop.hbase.client.BufferedMutator; 068import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 071import org.apache.hadoop.hbase.client.Connection; 072import org.apache.hadoop.hbase.client.ConnectionFactory; 073import org.apache.hadoop.hbase.client.Consistency; 074import org.apache.hadoop.hbase.client.Delete; 075import org.apache.hadoop.hbase.client.Durability; 076import org.apache.hadoop.hbase.client.Get; 077import org.apache.hadoop.hbase.client.Hbck; 078import org.apache.hadoop.hbase.client.MasterRegistry; 079import org.apache.hadoop.hbase.client.Put; 080import org.apache.hadoop.hbase.client.RegionInfo; 081import org.apache.hadoop.hbase.client.RegionInfoBuilder; 082import org.apache.hadoop.hbase.client.RegionLocator; 083import org.apache.hadoop.hbase.client.Result; 084import org.apache.hadoop.hbase.client.ResultScanner; 085import org.apache.hadoop.hbase.client.Scan; 086import org.apache.hadoop.hbase.client.Scan.ReadType; 087import org.apache.hadoop.hbase.client.Table; 088import org.apache.hadoop.hbase.client.TableDescriptor; 089import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 090import org.apache.hadoop.hbase.client.TableState; 091import org.apache.hadoop.hbase.fs.HFileSystem; 092import org.apache.hadoop.hbase.io.compress.Compression; 093import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 094import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 095import org.apache.hadoop.hbase.io.hfile.BlockCache; 096import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; 097import org.apache.hadoop.hbase.io.hfile.HFile; 098import org.apache.hadoop.hbase.ipc.RpcServerInterface; 099import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; 100import org.apache.hadoop.hbase.master.HMaster; 101import org.apache.hadoop.hbase.master.RegionState; 102import org.apache.hadoop.hbase.master.ServerManager; 103import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 104import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil; 105import org.apache.hadoop.hbase.master.assignment.RegionStateStore; 106import org.apache.hadoop.hbase.master.assignment.RegionStates; 107import org.apache.hadoop.hbase.mob.MobFileCache; 108import org.apache.hadoop.hbase.regionserver.BloomType; 109import org.apache.hadoop.hbase.regionserver.ChunkCreator; 110import org.apache.hadoop.hbase.regionserver.HRegion; 111import org.apache.hadoop.hbase.regionserver.HRegionServer; 112import org.apache.hadoop.hbase.regionserver.HStore; 113import org.apache.hadoop.hbase.regionserver.InternalScanner; 114import org.apache.hadoop.hbase.regionserver.MemStoreLAB; 115import org.apache.hadoop.hbase.regionserver.Region; 116import org.apache.hadoop.hbase.regionserver.RegionScanner; 117import org.apache.hadoop.hbase.regionserver.RegionServerServices; 118import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 119import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 120import org.apache.hadoop.hbase.security.User; 121import org.apache.hadoop.hbase.security.UserProvider; 122import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache; 123import org.apache.hadoop.hbase.util.Bytes; 124import org.apache.hadoop.hbase.util.CommonFSUtils; 125import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 126import org.apache.hadoop.hbase.util.FSUtils; 127import org.apache.hadoop.hbase.util.JVM; 128import org.apache.hadoop.hbase.util.JVMClusterUtil; 129import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 130import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 131import org.apache.hadoop.hbase.util.Pair; 132import org.apache.hadoop.hbase.util.ReflectionUtils; 133import org.apache.hadoop.hbase.util.RetryCounter; 134import org.apache.hadoop.hbase.util.Threads; 135import org.apache.hadoop.hbase.wal.WAL; 136import org.apache.hadoop.hbase.wal.WALFactory; 137import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; 138import org.apache.hadoop.hbase.zookeeper.ZKConfig; 139import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 140import org.apache.hadoop.hdfs.DFSClient; 141import org.apache.hadoop.hdfs.DistributedFileSystem; 142import org.apache.hadoop.hdfs.MiniDFSCluster; 143import org.apache.hadoop.hdfs.server.datanode.DataNode; 144import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; 145import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; 146import org.apache.hadoop.mapred.JobConf; 147import org.apache.hadoop.mapred.MiniMRCluster; 148import org.apache.hadoop.mapred.TaskLog; 149import org.apache.hadoop.minikdc.MiniKdc; 150import org.apache.yetus.audience.InterfaceAudience; 151import org.apache.yetus.audience.InterfaceStability; 152import org.apache.zookeeper.WatchedEvent; 153import org.apache.zookeeper.ZooKeeper; 154import org.apache.zookeeper.ZooKeeper.States; 155 156import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 157 158import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 159 160/** 161 * Facility for testing HBase. Replacement for old HBaseTestCase and HBaseClusterTestCase 162 * functionality. Create an instance and keep it around testing HBase. 163 * <p/> 164 * This class is meant to be your one-stop shop for anything you might need testing. Manages one 165 * cluster at a time only. Managed cluster can be an in-process {@link SingleProcessHBaseCluster}, 166 * or a deployed cluster of type {@code DistributedHBaseCluster}. Not all methods work with the real 167 * cluster. 168 * <p/> 169 * Depends on log4j being on classpath and hbase-site.xml for logging and test-run configuration. 170 * <p/> 171 * It does not set logging levels. 172 * <p/> 173 * In the configuration properties, default values for master-info-port and region-server-port are 174 * overridden such that a random port will be assigned (thus avoiding port contention if another 175 * local HBase instance is already running). 176 * <p/> 177 * To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" 178 * setting it to true. 179 */ 180@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.PHOENIX) 181@InterfaceStability.Evolving 182public class HBaseTestingUtil extends HBaseZKTestingUtil { 183 184 public static final int DEFAULT_REGIONS_PER_SERVER = 3; 185 186 private MiniDFSCluster dfsCluster = null; 187 private FsDatasetAsyncDiskServiceFixer dfsClusterFixer = null; 188 189 private volatile HBaseClusterInterface hbaseCluster = null; 190 private MiniMRCluster mrCluster = null; 191 192 /** If there is a mini cluster running for this testing utility instance. */ 193 private volatile boolean miniClusterRunning; 194 195 private String hadoopLogDir; 196 197 /** 198 * Directory on test filesystem where we put the data for this instance of HBaseTestingUtility 199 */ 200 private Path dataTestDirOnTestFS = null; 201 202 private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>(); 203 204 /** Filesystem URI used for map-reduce mini-cluster setup */ 205 private static String FS_URI; 206 207 /** This is for unit tests parameterized with a single boolean. */ 208 public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination(); 209 210 /** 211 * Checks to see if a specific port is available. 212 * @param port the port number to check for availability 213 * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not 214 */ 215 public static boolean available(int port) { 216 ServerSocket ss = null; 217 DatagramSocket ds = null; 218 try { 219 ss = new ServerSocket(port); 220 ss.setReuseAddress(true); 221 ds = new DatagramSocket(port); 222 ds.setReuseAddress(true); 223 return true; 224 } catch (IOException e) { 225 // Do nothing 226 } finally { 227 if (ds != null) { 228 ds.close(); 229 } 230 231 if (ss != null) { 232 try { 233 ss.close(); 234 } catch (IOException e) { 235 /* should not be thrown */ 236 } 237 } 238 } 239 240 return false; 241 } 242 243 /** 244 * Create all combinations of Bloom filters and compression algorithms for testing. 245 */ 246 private static List<Object[]> bloomAndCompressionCombinations() { 247 List<Object[]> configurations = new ArrayList<>(); 248 for (Compression.Algorithm comprAlgo : HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS) { 249 for (BloomType bloomType : BloomType.values()) { 250 configurations.add(new Object[] { comprAlgo, bloomType }); 251 } 252 } 253 return Collections.unmodifiableList(configurations); 254 } 255 256 /** 257 * Create combination of memstoreTS and tags 258 */ 259 private static List<Object[]> memStoreTSAndTagsCombination() { 260 List<Object[]> configurations = new ArrayList<>(); 261 configurations.add(new Object[] { false, false }); 262 configurations.add(new Object[] { false, true }); 263 configurations.add(new Object[] { true, false }); 264 configurations.add(new Object[] { true, true }); 265 return Collections.unmodifiableList(configurations); 266 } 267 268 public static List<Object[]> memStoreTSTagsAndOffheapCombination() { 269 List<Object[]> configurations = new ArrayList<>(); 270 configurations.add(new Object[] { false, false, true }); 271 configurations.add(new Object[] { false, false, false }); 272 configurations.add(new Object[] { false, true, true }); 273 configurations.add(new Object[] { false, true, false }); 274 configurations.add(new Object[] { true, false, true }); 275 configurations.add(new Object[] { true, false, false }); 276 configurations.add(new Object[] { true, true, true }); 277 configurations.add(new Object[] { true, true, false }); 278 return Collections.unmodifiableList(configurations); 279 } 280 281 public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = 282 bloomAndCompressionCombinations(); 283 284 /** 285 * <p> 286 * Create an HBaseTestingUtility using a default configuration. 287 * <p> 288 * Initially, all tmp files are written to a local test data directory. Once 289 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 290 * data will be written to the DFS directory instead. 291 */ 292 public HBaseTestingUtil() { 293 this(HBaseConfiguration.create()); 294 } 295 296 /** 297 * <p> 298 * Create an HBaseTestingUtility using a given configuration. 299 * <p> 300 * Initially, all tmp files are written to a local test data directory. Once 301 * {@link #startMiniDFSCluster} is called, either directly or via {@link #startMiniCluster()}, tmp 302 * data will be written to the DFS directory instead. 303 * @param conf The configuration to use for further operations 304 */ 305 public HBaseTestingUtil(@Nullable Configuration conf) { 306 super(conf); 307 308 // a hbase checksum verification failure will cause unit tests to fail 309 ChecksumUtil.generateExceptionForChecksumFailureForTest(true); 310 311 // Save this for when setting default file:// breaks things 312 if (this.conf.get("fs.defaultFS") != null) { 313 this.conf.set("original.defaultFS", this.conf.get("fs.defaultFS")); 314 } 315 if (this.conf.get(HConstants.HBASE_DIR) != null) { 316 this.conf.set("original.hbase.dir", this.conf.get(HConstants.HBASE_DIR)); 317 } 318 // Every cluster is a local cluster until we start DFS 319 // Note that conf could be null, but this.conf will not be 320 String dataTestDir = getDataTestDir().toString(); 321 this.conf.set("fs.defaultFS", "file:///"); 322 this.conf.set(HConstants.HBASE_DIR, "file://" + dataTestDir); 323 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 324 this.conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false); 325 // If the value for random ports isn't set set it to true, thus making 326 // tests opt-out for random port assignment 327 this.conf.setBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, 328 this.conf.getBoolean(LocalHBaseCluster.ASSIGN_RANDOM_PORTS, true)); 329 } 330 331 /** 332 * Close both the region {@code r} and it's underlying WAL. For use in tests. 333 */ 334 public static void closeRegionAndWAL(final Region r) throws IOException { 335 closeRegionAndWAL((HRegion) r); 336 } 337 338 /** 339 * Close both the HRegion {@code r} and it's underlying WAL. For use in tests. 340 */ 341 public static void closeRegionAndWAL(final HRegion r) throws IOException { 342 if (r == null) return; 343 r.close(); 344 if (r.getWAL() == null) return; 345 r.getWAL().close(); 346 } 347 348 /** 349 * Returns this classes's instance of {@link Configuration}. Be careful how you use the returned 350 * Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed 351 * by the Configuration. If say, a Connection was being used against a cluster that had been 352 * shutdown, see {@link #shutdownMiniCluster()}, then the Connection will no longer be wholesome. 353 * Rather than use the return direct, its usually best to make a copy and use that. Do 354 * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code> 355 * @return Instance of Configuration. 356 */ 357 @Override 358 public Configuration getConfiguration() { 359 return super.getConfiguration(); 360 } 361 362 public void setHBaseCluster(HBaseClusterInterface hbaseCluster) { 363 this.hbaseCluster = hbaseCluster; 364 } 365 366 /** 367 * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}. Give it a random name so can 368 * have many concurrent tests running if we need to. Moding a System property is not the way to do 369 * concurrent instances -- another instance could grab the temporary value unintentionally -- but 370 * not anything can do about it at moment; single instance only is how the minidfscluster works. 371 * We also create the underlying directory names for hadoop.log.dir, mapreduce.cluster.local.dir 372 * and hadoop.tmp.dir, and set the values in the conf, and as a system property for hadoop.tmp.dir 373 * (We do not create them!). 374 * @return The calculated data test build directory, if newly-created. 375 */ 376 @Override 377 protected Path setupDataTestDir() { 378 Path testPath = super.setupDataTestDir(); 379 if (null == testPath) { 380 return null; 381 } 382 383 createSubDirAndSystemProperty("hadoop.log.dir", testPath, "hadoop-log-dir"); 384 385 // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but 386 // we want our own value to ensure uniqueness on the same machine 387 createSubDirAndSystemProperty("hadoop.tmp.dir", testPath, "hadoop-tmp-dir"); 388 389 // Read and modified in org.apache.hadoop.mapred.MiniMRCluster 390 createSubDir("mapreduce.cluster.local.dir", testPath, "mapred-local-dir"); 391 return testPath; 392 } 393 394 private void createSubDirAndSystemProperty(String propertyName, Path parent, String subDirName) { 395 396 String sysValue = System.getProperty(propertyName); 397 398 if (sysValue != null) { 399 // There is already a value set. So we do nothing but hope 400 // that there will be no conflicts 401 LOG.info("System.getProperty(\"" + propertyName + "\") already set to: " + sysValue 402 + " so I do NOT create it in " + parent); 403 String confValue = conf.get(propertyName); 404 if (confValue != null && !confValue.endsWith(sysValue)) { 405 LOG.warn(propertyName + " property value differs in configuration and system: " 406 + "Configuration=" + confValue + " while System=" + sysValue 407 + " Erasing configuration value by system value."); 408 } 409 conf.set(propertyName, sysValue); 410 } else { 411 // Ok, it's not set, so we create it as a subdirectory 412 createSubDir(propertyName, parent, subDirName); 413 System.setProperty(propertyName, conf.get(propertyName)); 414 } 415 } 416 417 /** 418 * @return Where to write test data on the test filesystem; Returns working directory for the test 419 * filesystem by default 420 * @see #setupDataTestDirOnTestFS() 421 * @see #getTestFileSystem() 422 */ 423 private Path getBaseTestDirOnTestFS() throws IOException { 424 FileSystem fs = getTestFileSystem(); 425 return new Path(fs.getWorkingDirectory(), "test-data"); 426 } 427 428 /** 429 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 430 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 431 * on it. 432 * @return a unique path in the test filesystem 433 */ 434 public Path getDataTestDirOnTestFS() throws IOException { 435 if (dataTestDirOnTestFS == null) { 436 setupDataTestDirOnTestFS(); 437 } 438 439 return dataTestDirOnTestFS; 440 } 441 442 /** 443 * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()} to write 444 * temporary test data. Call this method after setting up the mini dfs cluster if the test relies 445 * on it. 446 * @return a unique path in the test filesystem 447 * @param subdirName name of the subdir to create under the base test dir 448 */ 449 public Path getDataTestDirOnTestFS(final String subdirName) throws IOException { 450 return new Path(getDataTestDirOnTestFS(), subdirName); 451 } 452 453 /** 454 * Sets up a path in test filesystem to be used by tests. Creates a new directory if not already 455 * setup. 456 */ 457 private void setupDataTestDirOnTestFS() throws IOException { 458 if (dataTestDirOnTestFS != null) { 459 LOG.warn("Data test on test fs dir already setup in " + dataTestDirOnTestFS.toString()); 460 return; 461 } 462 dataTestDirOnTestFS = getNewDataTestDirOnTestFS(); 463 } 464 465 /** 466 * Sets up a new path in test filesystem to be used by tests. 467 */ 468 private Path getNewDataTestDirOnTestFS() throws IOException { 469 // The file system can be either local, mini dfs, or if the configuration 470 // is supplied externally, it can be an external cluster FS. If it is a local 471 // file system, the tests should use getBaseTestDir, otherwise, we can use 472 // the working directory, and create a unique sub dir there 473 FileSystem fs = getTestFileSystem(); 474 Path newDataTestDir; 475 String randomStr = getRandomUUID().toString(); 476 if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { 477 newDataTestDir = new Path(getDataTestDir(), randomStr); 478 File dataTestDir = new File(newDataTestDir.toString()); 479 if (deleteOnExit()) dataTestDir.deleteOnExit(); 480 } else { 481 Path base = getBaseTestDirOnTestFS(); 482 newDataTestDir = new Path(base, randomStr); 483 if (deleteOnExit()) fs.deleteOnExit(newDataTestDir); 484 } 485 return newDataTestDir; 486 } 487 488 /** 489 * Cleans the test data directory on the test filesystem. 490 * @return True if we removed the test dirs 491 */ 492 public boolean cleanupDataTestDirOnTestFS() throws IOException { 493 boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true); 494 if (ret) { 495 dataTestDirOnTestFS = null; 496 } 497 return ret; 498 } 499 500 /** 501 * Cleans a subdirectory under the test data directory on the test filesystem. 502 * @return True if we removed child 503 */ 504 public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException { 505 Path cpath = getDataTestDirOnTestFS(subdirName); 506 return getTestFileSystem().delete(cpath, true); 507 } 508 509 /** 510 * Start a minidfscluster. 511 * @param servers How many DNs to start. 512 * @see #shutdownMiniDFSCluster() 513 * @return The mini dfs cluster created. 514 */ 515 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception { 516 return startMiniDFSCluster(servers, null); 517 } 518 519 /** 520 * Start a minidfscluster. This is useful if you want to run datanode on distinct hosts for things 521 * like HDFS block location verification. If you start MiniDFSCluster without host names, all 522 * instances of the datanodes will have the same host name. 523 * @param hosts hostnames DNs to run on. 524 * @see #shutdownMiniDFSCluster() 525 * @return The mini dfs cluster created. 526 */ 527 public MiniDFSCluster startMiniDFSCluster(final String[] hosts) throws Exception { 528 if (hosts != null && hosts.length != 0) { 529 return startMiniDFSCluster(hosts.length, hosts); 530 } else { 531 return startMiniDFSCluster(1, null); 532 } 533 } 534 535 /** 536 * Start a minidfscluster. Can only create one. 537 * @param servers How many DNs to start. 538 * @param hosts hostnames DNs to run on. 539 * @see #shutdownMiniDFSCluster() 540 * @return The mini dfs cluster created. 541 */ 542 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] hosts) throws Exception { 543 return startMiniDFSCluster(servers, null, hosts); 544 } 545 546 private void setFs() throws IOException { 547 if (this.dfsCluster == null) { 548 LOG.info("Skipping setting fs because dfsCluster is null"); 549 return; 550 } 551 FileSystem fs = this.dfsCluster.getFileSystem(); 552 CommonFSUtils.setFsDefault(this.conf, new Path(fs.getUri())); 553 554 // re-enable this check with dfs 555 conf.unset(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE); 556 } 557 558 // Workaround to avoid IllegalThreadStateException 559 // See HBASE-27148 for more details 560 private static final class FsDatasetAsyncDiskServiceFixer extends Thread { 561 562 private volatile boolean stopped = false; 563 564 private final MiniDFSCluster cluster; 565 566 FsDatasetAsyncDiskServiceFixer(MiniDFSCluster cluster) { 567 super("FsDatasetAsyncDiskServiceFixer"); 568 setDaemon(true); 569 this.cluster = cluster; 570 } 571 572 @Override 573 public void run() { 574 while (!stopped) { 575 try { 576 Thread.sleep(30000); 577 } catch (InterruptedException e) { 578 Thread.currentThread().interrupt(); 579 continue; 580 } 581 // we could add new datanodes during tests, so here we will check every 30 seconds, as the 582 // timeout of the thread pool executor is 60 seconds by default. 583 try { 584 for (DataNode dn : cluster.getDataNodes()) { 585 FsDatasetSpi<?> dataset = dn.getFSDataset(); 586 Field service = dataset.getClass().getDeclaredField("asyncDiskService"); 587 service.setAccessible(true); 588 Object asyncDiskService = service.get(dataset); 589 Field group = asyncDiskService.getClass().getDeclaredField("threadGroup"); 590 group.setAccessible(true); 591 ThreadGroup threadGroup = (ThreadGroup) group.get(asyncDiskService); 592 if (threadGroup.isDaemon()) { 593 threadGroup.setDaemon(false); 594 } 595 } 596 } catch (NoSuchFieldException e) { 597 LOG.debug("NoSuchFieldException: " + e.getMessage() 598 + "; It might because your Hadoop version > 3.2.3 or 3.3.4, " 599 + "See HBASE-27595 for details."); 600 } catch (Exception e) { 601 LOG.warn("failed to reset thread pool timeout for FsDatasetAsyncDiskService", e); 602 } 603 } 604 } 605 606 void shutdown() { 607 stopped = true; 608 interrupt(); 609 } 610 } 611 612 public MiniDFSCluster startMiniDFSCluster(int servers, final String[] racks, String[] hosts) 613 throws Exception { 614 createDirsAndSetProperties(); 615 EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); 616 617 this.dfsCluster = 618 new MiniDFSCluster(0, this.conf, servers, true, true, true, null, racks, hosts, null); 619 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 620 this.dfsClusterFixer.start(); 621 // Set this just-started cluster as our filesystem. 622 setFs(); 623 624 // Wait for the cluster to be totally up 625 this.dfsCluster.waitClusterUp(); 626 627 // reset the test directory for test file system 628 dataTestDirOnTestFS = null; 629 String dataTestDir = getDataTestDir().toString(); 630 conf.set(HConstants.HBASE_DIR, dataTestDir); 631 LOG.debug("Setting {} to {}", HConstants.HBASE_DIR, dataTestDir); 632 633 return this.dfsCluster; 634 } 635 636 public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException { 637 createDirsAndSetProperties(); 638 dfsCluster = 639 new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null); 640 this.dfsClusterFixer = new FsDatasetAsyncDiskServiceFixer(dfsCluster); 641 this.dfsClusterFixer.start(); 642 return dfsCluster; 643 } 644 645 /** 646 * This is used before starting HDFS and map-reduce mini-clusters Run something like the below to 647 * check for the likes of '/tmp' references -- i.e. references outside of the test data dir -- in 648 * the conf. 649 * 650 * <pre> 651 * Configuration conf = TEST_UTIL.getConfiguration(); 652 * for (Iterator<Map.Entry<String, String>> i = conf.iterator(); i.hasNext();) { 653 * Map.Entry<String, String> e = i.next(); 654 * assertFalse(e.getKey() + " " + e.getValue(), e.getValue().contains("/tmp")); 655 * } 656 * </pre> 657 */ 658 private void createDirsAndSetProperties() throws IOException { 659 setupClusterTestDir(); 660 conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, clusterTestDir.getCanonicalPath()); 661 createDirAndSetProperty("test.cache.data"); 662 createDirAndSetProperty("hadoop.tmp.dir"); 663 hadoopLogDir = createDirAndSetProperty("hadoop.log.dir"); 664 createDirAndSetProperty("mapreduce.cluster.local.dir"); 665 createDirAndSetProperty("mapreduce.cluster.temp.dir"); 666 enableShortCircuit(); 667 668 Path root = getDataTestDirOnTestFS("hadoop"); 669 conf.set(MapreduceTestingShim.getMROutputDirProp(), 670 new Path(root, "mapred-output-dir").toString()); 671 conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString()); 672 conf.set("mapreduce.jobtracker.staging.root.dir", 673 new Path(root, "mapreduce-jobtracker-staging-root-dir").toString()); 674 conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString()); 675 conf.set("yarn.app.mapreduce.am.staging-dir", 676 new Path(root, "mapreduce-am-staging-root-dir").toString()); 677 678 // Frustrate yarn's and hdfs's attempts at writing /tmp. 679 // Below is fragile. Make it so we just interpolate any 'tmp' reference. 680 createDirAndSetProperty("yarn.node-labels.fs-store.root-dir"); 681 createDirAndSetProperty("yarn.node-attribute.fs-store.root-dir"); 682 createDirAndSetProperty("yarn.nodemanager.log-dirs"); 683 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 684 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.active-dir"); 685 createDirAndSetProperty("yarn.timeline-service.entity-group-fs-store.done-dir"); 686 createDirAndSetProperty("yarn.nodemanager.remote-app-log-dir"); 687 createDirAndSetProperty("dfs.journalnode.edits.dir"); 688 createDirAndSetProperty("dfs.datanode.shared.file.descriptor.paths"); 689 createDirAndSetProperty("nfs.dump.dir"); 690 createDirAndSetProperty("java.io.tmpdir"); 691 createDirAndSetProperty("dfs.journalnode.edits.dir"); 692 createDirAndSetProperty("dfs.provided.aliasmap.inmemory.leveldb.dir"); 693 createDirAndSetProperty("fs.s3a.committer.staging.tmp.path"); 694 } 695 696 /** 697 * Check whether the tests should assume NEW_VERSION_BEHAVIOR when creating new column families. 698 * Default to false. 699 */ 700 public boolean isNewVersionBehaviorEnabled() { 701 final String propName = "hbase.tests.new.version.behavior"; 702 String v = System.getProperty(propName); 703 if (v != null) { 704 return Boolean.parseBoolean(v); 705 } 706 return false; 707 } 708 709 /** 710 * Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property. This 711 * allows to specify this parameter on the command line. If not set, default is true. 712 */ 713 public boolean isReadShortCircuitOn() { 714 final String propName = "hbase.tests.use.shortcircuit.reads"; 715 String readOnProp = System.getProperty(propName); 716 if (readOnProp != null) { 717 return Boolean.parseBoolean(readOnProp); 718 } else { 719 return conf.getBoolean(propName, false); 720 } 721 } 722 723 /** 724 * Enable the short circuit read, unless configured differently. Set both HBase and HDFS settings, 725 * including skipping the hdfs checksum checks. 726 */ 727 private void enableShortCircuit() { 728 if (isReadShortCircuitOn()) { 729 String curUser = System.getProperty("user.name"); 730 LOG.info("read short circuit is ON for user " + curUser); 731 // read short circuit, for hdfs 732 conf.set("dfs.block.local-path-access.user", curUser); 733 // read short circuit, for hbase 734 conf.setBoolean("dfs.client.read.shortcircuit", true); 735 // Skip checking checksum, for the hdfs client and the datanode 736 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true); 737 } else { 738 LOG.info("read short circuit is OFF"); 739 } 740 } 741 742 private String createDirAndSetProperty(final String property) { 743 return createDirAndSetProperty(property, property); 744 } 745 746 private String createDirAndSetProperty(final String relPath, String property) { 747 String path = getDataTestDir(relPath).toString(); 748 System.setProperty(property, path); 749 conf.set(property, path); 750 new File(path).mkdirs(); 751 LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); 752 return path; 753 } 754 755 /** 756 * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} or does nothing. 757 */ 758 public void shutdownMiniDFSCluster() throws IOException { 759 if (this.dfsCluster != null) { 760 // The below throws an exception per dn, AsynchronousCloseException. 761 this.dfsCluster.shutdown(); 762 dfsCluster = null; 763 // It is possible that the dfs cluster is set through setDFSCluster method, where we will not 764 // have a fixer 765 if (dfsClusterFixer != null) { 766 this.dfsClusterFixer.shutdown(); 767 dfsClusterFixer = null; 768 } 769 dataTestDirOnTestFS = null; 770 CommonFSUtils.setFsDefault(this.conf, new Path("file:///")); 771 } 772 } 773 774 /** 775 * Start up a minicluster of hbase, dfs and zookeeper clusters with given slave node number. All 776 * other options will use default values, defined in {@link StartTestingClusterOption.Builder}. 777 * @param numSlaves slave node number, for both HBase region server and HDFS data node. 778 * @see #startMiniCluster(StartTestingClusterOption option) 779 * @see #shutdownMiniDFSCluster() 780 */ 781 public SingleProcessHBaseCluster startMiniCluster(int numSlaves) throws Exception { 782 StartTestingClusterOption option = StartTestingClusterOption.builder() 783 .numRegionServers(numSlaves).numDataNodes(numSlaves).build(); 784 return startMiniCluster(option); 785 } 786 787 /** 788 * Start up a minicluster of hbase, dfs and zookeeper all using default options. Option default 789 * value can be found in {@link StartTestingClusterOption.Builder}. 790 * @see #startMiniCluster(StartTestingClusterOption option) 791 * @see #shutdownMiniDFSCluster() 792 */ 793 public SingleProcessHBaseCluster startMiniCluster() throws Exception { 794 return startMiniCluster(StartTestingClusterOption.builder().build()); 795 } 796 797 /** 798 * Start up a mini cluster of hbase, optionally dfs and zookeeper if needed. It modifies 799 * Configuration. It homes the cluster data directory under a random subdirectory in a directory 800 * under System property test.build.data, to be cleaned up on exit. 801 * @see #shutdownMiniDFSCluster() 802 */ 803 public SingleProcessHBaseCluster startMiniCluster(StartTestingClusterOption option) 804 throws Exception { 805 LOG.info("Starting up minicluster with option: {}", option); 806 807 // If we already put up a cluster, fail. 808 if (miniClusterRunning) { 809 throw new IllegalStateException("A mini-cluster is already running"); 810 } 811 miniClusterRunning = true; 812 813 setupClusterTestDir(); 814 815 // Bring up mini dfs cluster. This spews a bunch of warnings about missing 816 // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. 817 if (dfsCluster == null) { 818 LOG.info("STARTING DFS"); 819 dfsCluster = startMiniDFSCluster(option.getNumDataNodes(), option.getDataNodeHosts()); 820 } else { 821 LOG.info("NOT STARTING DFS"); 822 } 823 824 // Start up a zk cluster. 825 if (getZkCluster() == null) { 826 startMiniZKCluster(option.getNumZkServers()); 827 } 828 829 // Start the MiniHBaseCluster 830 return startMiniHBaseCluster(option); 831 } 832 833 /** 834 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 835 * {@link #startMiniCluster()}. This is useful when doing stepped startup of clusters. 836 * @return Reference to the hbase mini hbase cluster. 837 * @see #startMiniCluster(StartTestingClusterOption) 838 * @see #shutdownMiniHBaseCluster() 839 */ 840 public SingleProcessHBaseCluster startMiniHBaseCluster(StartTestingClusterOption option) 841 throws IOException, InterruptedException { 842 // Now do the mini hbase cluster. Set the hbase.rootdir in config. 843 createRootDir(option.isCreateRootDir()); 844 if (option.isCreateWALDir()) { 845 createWALRootDir(); 846 } 847 // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is 848 // for tests that do not read hbase-defaults.xml 849 setHBaseFsTmpDir(); 850 851 // These settings will make the server waits until this exact number of 852 // regions servers are connected. 853 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { 854 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, option.getNumRegionServers()); 855 } 856 if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { 857 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, option.getNumRegionServers()); 858 } 859 860 Configuration c = new Configuration(this.conf); 861 this.hbaseCluster = new SingleProcessHBaseCluster(c, option.getNumMasters(), 862 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 863 option.getMasterClass(), option.getRsClass()); 864 // Populate the master address configuration from mini cluster configuration. 865 conf.set(HConstants.MASTER_ADDRS_KEY, MasterRegistry.getMasterAddr(c)); 866 // Don't leave here till we've done a successful scan of the hbase:meta 867 try (Table t = getConnection().getTable(TableName.META_TABLE_NAME); 868 ResultScanner s = t.getScanner(new Scan())) { 869 for (;;) { 870 if (s.next() == null) { 871 break; 872 } 873 } 874 } 875 876 getAdmin(); // create immediately the hbaseAdmin 877 LOG.info("Minicluster is up; activeMaster={}", getHBaseCluster().getMaster()); 878 879 return (SingleProcessHBaseCluster) hbaseCluster; 880 } 881 882 /** 883 * Starts up mini hbase cluster using default options. Default options can be found in 884 * {@link StartTestingClusterOption.Builder}. 885 * @see #startMiniHBaseCluster(StartTestingClusterOption) 886 * @see #shutdownMiniHBaseCluster() 887 */ 888 public SingleProcessHBaseCluster startMiniHBaseCluster() 889 throws IOException, InterruptedException { 890 return startMiniHBaseCluster(StartTestingClusterOption.builder().build()); 891 } 892 893 /** 894 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 895 * {@link #startMiniCluster()}. All other options will use default values, defined in 896 * {@link StartTestingClusterOption.Builder}. 897 * @param numMasters Master node number. 898 * @param numRegionServers Number of region servers. 899 * @return The mini HBase cluster created. 900 * @see #shutdownMiniHBaseCluster() 901 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 902 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 903 * @see #startMiniHBaseCluster(StartTestingClusterOption) 904 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 905 */ 906 @Deprecated 907 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers) 908 throws IOException, InterruptedException { 909 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 910 .numRegionServers(numRegionServers).build(); 911 return startMiniHBaseCluster(option); 912 } 913 914 /** 915 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 916 * {@link #startMiniCluster()}. All other options will use default values, defined in 917 * {@link StartTestingClusterOption.Builder}. 918 * @param numMasters Master node number. 919 * @param numRegionServers Number of region servers. 920 * @param rsPorts Ports that RegionServer should use. 921 * @return The mini HBase cluster created. 922 * @see #shutdownMiniHBaseCluster() 923 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 924 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 925 * @see #startMiniHBaseCluster(StartTestingClusterOption) 926 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 927 */ 928 @Deprecated 929 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 930 List<Integer> rsPorts) throws IOException, InterruptedException { 931 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 932 .numRegionServers(numRegionServers).rsPorts(rsPorts).build(); 933 return startMiniHBaseCluster(option); 934 } 935 936 /** 937 * Starts up mini hbase cluster. Usually you won't want this. You'll usually want 938 * {@link #startMiniCluster()}. All other options will use default values, defined in 939 * {@link StartTestingClusterOption.Builder}. 940 * @param numMasters Master node number. 941 * @param numRegionServers Number of region servers. 942 * @param rsPorts Ports that RegionServer should use. 943 * @param masterClass The class to use as HMaster, or null for default. 944 * @param rsClass The class to use as HRegionServer, or null for default. 945 * @param createRootDir Whether to create a new root or data directory path. 946 * @param createWALDir Whether to create a new WAL directory. 947 * @return The mini HBase cluster created. 948 * @see #shutdownMiniHBaseCluster() 949 * @deprecated since 2.2.0 and will be removed in 4.0.0. Use 950 * {@link #startMiniHBaseCluster(StartTestingClusterOption)} instead. 951 * @see #startMiniHBaseCluster(StartTestingClusterOption) 952 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21071">HBASE-21071</a> 953 */ 954 @Deprecated 955 public SingleProcessHBaseCluster startMiniHBaseCluster(int numMasters, int numRegionServers, 956 List<Integer> rsPorts, Class<? extends HMaster> masterClass, 957 Class<? extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer> rsClass, 958 boolean createRootDir, boolean createWALDir) throws IOException, InterruptedException { 959 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(numMasters) 960 .masterClass(masterClass).numRegionServers(numRegionServers).rsClass(rsClass).rsPorts(rsPorts) 961 .createRootDir(createRootDir).createWALDir(createWALDir).build(); 962 return startMiniHBaseCluster(option); 963 } 964 965 /** 966 * Starts the hbase cluster up again after shutting it down previously in a test. Use this if you 967 * want to keep dfs/zk up and just stop/start hbase. 968 * @param servers number of region servers 969 */ 970 public void restartHBaseCluster(int servers) throws IOException, InterruptedException { 971 this.restartHBaseCluster(servers, null); 972 } 973 974 public void restartHBaseCluster(int servers, List<Integer> ports) 975 throws IOException, InterruptedException { 976 StartTestingClusterOption option = 977 StartTestingClusterOption.builder().numRegionServers(servers).rsPorts(ports).build(); 978 restartHBaseCluster(option); 979 invalidateConnection(); 980 } 981 982 public void restartHBaseCluster(StartTestingClusterOption option) 983 throws IOException, InterruptedException { 984 closeConnection(); 985 this.hbaseCluster = new SingleProcessHBaseCluster(this.conf, option.getNumMasters(), 986 option.getNumAlwaysStandByMasters(), option.getNumRegionServers(), option.getRsPorts(), 987 option.getMasterClass(), option.getRsClass()); 988 // Don't leave here till we've done a successful scan of the hbase:meta 989 Connection conn = ConnectionFactory.createConnection(this.conf); 990 Table t = conn.getTable(TableName.META_TABLE_NAME); 991 ResultScanner s = t.getScanner(new Scan()); 992 while (s.next() != null) { 993 // do nothing 994 } 995 LOG.info("HBase has been restarted"); 996 s.close(); 997 t.close(); 998 conn.close(); 999 } 1000 1001 /** 1002 * Returns current mini hbase cluster. Only has something in it after a call to 1003 * {@link #startMiniCluster()}. 1004 * @see #startMiniCluster() 1005 */ 1006 public SingleProcessHBaseCluster getMiniHBaseCluster() { 1007 if (this.hbaseCluster == null || this.hbaseCluster instanceof SingleProcessHBaseCluster) { 1008 return (SingleProcessHBaseCluster) this.hbaseCluster; 1009 } 1010 throw new RuntimeException( 1011 hbaseCluster + " not an instance of " + SingleProcessHBaseCluster.class.getName()); 1012 } 1013 1014 /** 1015 * Stops mini hbase, zk, and hdfs clusters. 1016 * @see #startMiniCluster(int) 1017 */ 1018 public void shutdownMiniCluster() throws IOException { 1019 LOG.info("Shutting down minicluster"); 1020 shutdownMiniHBaseCluster(); 1021 shutdownMiniDFSCluster(); 1022 shutdownMiniZKCluster(); 1023 1024 cleanupTestDir(); 1025 miniClusterRunning = false; 1026 LOG.info("Minicluster is down"); 1027 } 1028 1029 /** 1030 * Shutdown HBase mini cluster.Does not shutdown zk or dfs if running. 1031 * @throws java.io.IOException in case command is unsuccessful 1032 */ 1033 public void shutdownMiniHBaseCluster() throws IOException { 1034 cleanup(); 1035 if (this.hbaseCluster != null) { 1036 this.hbaseCluster.shutdown(); 1037 // Wait till hbase is down before going on to shutdown zk. 1038 this.hbaseCluster.waitUntilShutDown(); 1039 this.hbaseCluster = null; 1040 } 1041 if (zooKeeperWatcher != null) { 1042 zooKeeperWatcher.close(); 1043 zooKeeperWatcher = null; 1044 } 1045 } 1046 1047 /** 1048 * Abruptly Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. 1049 * @throws java.io.IOException throws in case command is unsuccessful 1050 */ 1051 public void killMiniHBaseCluster() throws IOException { 1052 cleanup(); 1053 if (this.hbaseCluster != null) { 1054 getMiniHBaseCluster().killAll(); 1055 this.hbaseCluster = null; 1056 } 1057 if (zooKeeperWatcher != null) { 1058 zooKeeperWatcher.close(); 1059 zooKeeperWatcher = null; 1060 } 1061 } 1062 1063 // close hbase admin, close current connection and reset MIN MAX configs for RS. 1064 private void cleanup() throws IOException { 1065 closeConnection(); 1066 // unset the configuration for MIN and MAX RS to start 1067 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 1068 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); 1069 } 1070 1071 /** 1072 * Returns the path to the default root dir the minicluster uses. If <code>create</code> is true, 1073 * a new root directory path is fetched irrespective of whether it has been fetched before or not. 1074 * If false, previous path is used. Note: this does not cause the root dir to be created. 1075 * @return Fully qualified path for the default hbase root dir 1076 */ 1077 public Path getDefaultRootDirPath(boolean create) throws IOException { 1078 if (!create) { 1079 return getDataTestDirOnTestFS(); 1080 } else { 1081 return getNewDataTestDirOnTestFS(); 1082 } 1083 } 1084 1085 /** 1086 * Same as {{@link HBaseTestingUtil#getDefaultRootDirPath(boolean create)} except that 1087 * <code>create</code> flag is false. Note: this does not cause the root dir to be created. 1088 * @return Fully qualified path for the default hbase root dir 1089 */ 1090 public Path getDefaultRootDirPath() throws IOException { 1091 return getDefaultRootDirPath(false); 1092 } 1093 1094 /** 1095 * Creates an hbase rootdir in user home directory. Also creates hbase version file. Normally you 1096 * won't make use of this method. Root hbasedir is created for you as part of mini cluster 1097 * startup. You'd only use this method if you were doing manual operation. 1098 * @param create This flag decides whether to get a new root or data directory path or not, if it 1099 * has been fetched already. Note : Directory will be made irrespective of whether 1100 * path has been fetched or not. If directory already exists, it will be overwritten 1101 * @return Fully qualified path to hbase root dir 1102 */ 1103 public Path createRootDir(boolean create) throws IOException { 1104 FileSystem fs = FileSystem.get(this.conf); 1105 Path hbaseRootdir = getDefaultRootDirPath(create); 1106 CommonFSUtils.setRootDir(this.conf, hbaseRootdir); 1107 fs.mkdirs(hbaseRootdir); 1108 FSUtils.setVersion(fs, hbaseRootdir); 1109 return hbaseRootdir; 1110 } 1111 1112 /** 1113 * Same as {@link HBaseTestingUtil#createRootDir(boolean create)} except that <code>create</code> 1114 * flag is false. 1115 * @return Fully qualified path to hbase root dir 1116 */ 1117 public Path createRootDir() throws IOException { 1118 return createRootDir(false); 1119 } 1120 1121 /** 1122 * Creates a hbase walDir in the user's home directory. Normally you won't make use of this 1123 * method. Root hbaseWALDir is created for you as part of mini cluster startup. You'd only use 1124 * this method if you were doing manual operation. 1125 * @return Fully qualified path to hbase root dir 1126 */ 1127 public Path createWALRootDir() throws IOException { 1128 FileSystem fs = FileSystem.get(this.conf); 1129 Path walDir = getNewDataTestDirOnTestFS(); 1130 CommonFSUtils.setWALRootDir(this.conf, walDir); 1131 fs.mkdirs(walDir); 1132 return walDir; 1133 } 1134 1135 private void setHBaseFsTmpDir() throws IOException { 1136 String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); 1137 if (hbaseFsTmpDirInString == null) { 1138 this.conf.set("hbase.fs.tmp.dir", getDataTestDirOnTestFS("hbase-staging").toString()); 1139 LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir")); 1140 } else { 1141 LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); 1142 } 1143 } 1144 1145 /** 1146 * Flushes all caches in the mini hbase cluster 1147 */ 1148 public void flush() throws IOException { 1149 getMiniHBaseCluster().flushcache(); 1150 } 1151 1152 /** 1153 * Flushes all caches in the mini hbase cluster 1154 */ 1155 public void flush(TableName tableName) throws IOException { 1156 getMiniHBaseCluster().flushcache(tableName); 1157 } 1158 1159 /** 1160 * Compact all regions in the mini hbase cluster 1161 */ 1162 public void compact(boolean major) throws IOException { 1163 getMiniHBaseCluster().compact(major); 1164 } 1165 1166 /** 1167 * Compact all of a table's reagion in the mini hbase cluster 1168 */ 1169 public void compact(TableName tableName, boolean major) throws IOException { 1170 getMiniHBaseCluster().compact(tableName, major); 1171 } 1172 1173 /** 1174 * Create a table. 1175 * @return A Table instance for the created table. 1176 */ 1177 public Table createTable(TableName tableName, String family) throws IOException { 1178 return createTable(tableName, new String[] { family }); 1179 } 1180 1181 /** 1182 * Create a table. 1183 * @return A Table instance for the created table. 1184 */ 1185 public Table createTable(TableName tableName, String[] families) throws IOException { 1186 List<byte[]> fams = new ArrayList<>(families.length); 1187 for (String family : families) { 1188 fams.add(Bytes.toBytes(family)); 1189 } 1190 return createTable(tableName, fams.toArray(new byte[0][])); 1191 } 1192 1193 /** 1194 * Create a table. 1195 * @return A Table instance for the created table. 1196 */ 1197 public Table createTable(TableName tableName, byte[] family) throws IOException { 1198 return createTable(tableName, new byte[][] { family }); 1199 } 1200 1201 /** 1202 * Create a table with multiple regions. 1203 * @return A Table instance for the created table. 1204 */ 1205 public Table createMultiRegionTable(TableName tableName, byte[] family, int numRegions) 1206 throws IOException { 1207 if (numRegions < 3) throw new IOException("Must create at least 3 regions"); 1208 byte[] startKey = Bytes.toBytes("aaaaa"); 1209 byte[] endKey = Bytes.toBytes("zzzzz"); 1210 byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); 1211 1212 return createTable(tableName, new byte[][] { family }, splitKeys); 1213 } 1214 1215 /** 1216 * Create a table. 1217 * @return A Table instance for the created table. 1218 */ 1219 public Table createTable(TableName tableName, byte[][] families) throws IOException { 1220 return createTable(tableName, families, (byte[][]) null); 1221 } 1222 1223 /** 1224 * Create a table with multiple regions. 1225 * @return A Table instance for the created table. 1226 */ 1227 public Table createMultiRegionTable(TableName tableName, byte[][] families) throws IOException { 1228 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE); 1229 } 1230 1231 /** 1232 * Create a table with multiple regions. 1233 * @param replicaCount replica count. 1234 * @return A Table instance for the created table. 1235 */ 1236 public Table createMultiRegionTable(TableName tableName, int replicaCount, byte[][] families) 1237 throws IOException { 1238 return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE, replicaCount); 1239 } 1240 1241 /** 1242 * Create a table. 1243 * @return A Table instance for the created table. 1244 */ 1245 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys) 1246 throws IOException { 1247 return createTable(tableName, families, splitKeys, 1, new Configuration(getConfiguration())); 1248 } 1249 1250 /** 1251 * Create a table. 1252 * @param tableName the table name 1253 * @param families the families 1254 * @param splitKeys the splitkeys 1255 * @param replicaCount the region replica count 1256 * @return A Table instance for the created table. 1257 * @throws IOException throws IOException 1258 */ 1259 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1260 int replicaCount) throws IOException { 1261 return createTable(tableName, families, splitKeys, replicaCount, 1262 new Configuration(getConfiguration())); 1263 } 1264 1265 public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, 1266 byte[] endKey, int numRegions) throws IOException { 1267 TableDescriptor desc = createTableDescriptor(tableName, families, numVersions); 1268 1269 getAdmin().createTable(desc, startKey, endKey, numRegions); 1270 // HBaseAdmin only waits for regions to appear in hbase:meta we 1271 // should wait until they are assigned 1272 waitUntilAllRegionsAssigned(tableName); 1273 return getConnection().getTable(tableName); 1274 } 1275 1276 /** 1277 * Create a table. 1278 * @param c Configuration to use 1279 * @return A Table instance for the created table. 1280 */ 1281 public Table createTable(TableDescriptor htd, byte[][] families, Configuration c) 1282 throws IOException { 1283 return createTable(htd, families, null, c); 1284 } 1285 1286 /** 1287 * Create a table. 1288 * @param htd table descriptor 1289 * @param families array of column families 1290 * @param splitKeys array of split keys 1291 * @param c Configuration to use 1292 * @return A Table instance for the created table. 1293 * @throws IOException if getAdmin or createTable fails 1294 */ 1295 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1296 Configuration c) throws IOException { 1297 // Disable blooms (they are on by default as of 0.95) but we disable them here because 1298 // tests have hard coded counts of what to expect in block cache, etc., and blooms being 1299 // on is interfering. 1300 return createTable(htd, families, splitKeys, BloomType.NONE, HConstants.DEFAULT_BLOCKSIZE, c); 1301 } 1302 1303 /** 1304 * Create a table. 1305 * @param htd table descriptor 1306 * @param families array of column families 1307 * @param splitKeys array of split keys 1308 * @param type Bloom type 1309 * @param blockSize block size 1310 * @param c Configuration to use 1311 * @return A Table instance for the created table. 1312 * @throws IOException if getAdmin or createTable fails 1313 */ 1314 1315 public Table createTable(TableDescriptor htd, byte[][] families, byte[][] splitKeys, 1316 BloomType type, int blockSize, Configuration c) throws IOException { 1317 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1318 for (byte[] family : families) { 1319 ColumnFamilyDescriptorBuilder cfdb = ColumnFamilyDescriptorBuilder.newBuilder(family) 1320 .setBloomFilterType(type).setBlocksize(blockSize); 1321 if (isNewVersionBehaviorEnabled()) { 1322 cfdb.setNewVersionBehavior(true); 1323 } 1324 builder.setColumnFamily(cfdb.build()); 1325 } 1326 TableDescriptor td = builder.build(); 1327 if (splitKeys != null) { 1328 getAdmin().createTable(td, splitKeys); 1329 } else { 1330 getAdmin().createTable(td); 1331 } 1332 // HBaseAdmin only waits for regions to appear in hbase:meta 1333 // we should wait until they are assigned 1334 waitUntilAllRegionsAssigned(td.getTableName()); 1335 return getConnection().getTable(td.getTableName()); 1336 } 1337 1338 /** 1339 * Create a table. 1340 * @param htd table descriptor 1341 * @param splitRows array of split keys 1342 * @return A Table instance for the created table. 1343 */ 1344 public Table createTable(TableDescriptor htd, byte[][] splitRows) throws IOException { 1345 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd); 1346 if (isNewVersionBehaviorEnabled()) { 1347 for (ColumnFamilyDescriptor family : htd.getColumnFamilies()) { 1348 builder.setColumnFamily( 1349 ColumnFamilyDescriptorBuilder.newBuilder(family).setNewVersionBehavior(true).build()); 1350 } 1351 } 1352 if (splitRows != null) { 1353 getAdmin().createTable(builder.build(), splitRows); 1354 } else { 1355 getAdmin().createTable(builder.build()); 1356 } 1357 // HBaseAdmin only waits for regions to appear in hbase:meta 1358 // we should wait until they are assigned 1359 waitUntilAllRegionsAssigned(htd.getTableName()); 1360 return getConnection().getTable(htd.getTableName()); 1361 } 1362 1363 /** 1364 * Create a table. 1365 * @param tableName the table name 1366 * @param families the families 1367 * @param splitKeys the split keys 1368 * @param replicaCount the replica count 1369 * @param c Configuration to use 1370 * @return A Table instance for the created table. 1371 */ 1372 public Table createTable(TableName tableName, byte[][] families, byte[][] splitKeys, 1373 int replicaCount, final Configuration c) throws IOException { 1374 TableDescriptor htd = 1375 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(replicaCount).build(); 1376 return createTable(htd, families, splitKeys, c); 1377 } 1378 1379 /** 1380 * Create a table. 1381 * @return A Table instance for the created table. 1382 */ 1383 public Table createTable(TableName tableName, byte[] family, int numVersions) throws IOException { 1384 return createTable(tableName, new byte[][] { family }, numVersions); 1385 } 1386 1387 /** 1388 * Create a table. 1389 * @return A Table instance for the created table. 1390 */ 1391 public Table createTable(TableName tableName, byte[][] families, int numVersions) 1392 throws IOException { 1393 return createTable(tableName, families, numVersions, (byte[][]) null); 1394 } 1395 1396 /** 1397 * Create a table. 1398 * @return A Table instance for the created table. 1399 */ 1400 public Table createTable(TableName tableName, byte[][] families, int numVersions, 1401 byte[][] splitKeys) throws IOException { 1402 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1403 for (byte[] family : families) { 1404 ColumnFamilyDescriptorBuilder cfBuilder = 1405 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions); 1406 if (isNewVersionBehaviorEnabled()) { 1407 cfBuilder.setNewVersionBehavior(true); 1408 } 1409 builder.setColumnFamily(cfBuilder.build()); 1410 } 1411 if (splitKeys != null) { 1412 getAdmin().createTable(builder.build(), splitKeys); 1413 } else { 1414 getAdmin().createTable(builder.build()); 1415 } 1416 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1417 // assigned 1418 waitUntilAllRegionsAssigned(tableName); 1419 return getConnection().getTable(tableName); 1420 } 1421 1422 /** 1423 * Create a table with multiple regions. 1424 * @return A Table instance for the created table. 1425 */ 1426 public Table createMultiRegionTable(TableName tableName, byte[][] families, int numVersions) 1427 throws IOException { 1428 return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE); 1429 } 1430 1431 /** 1432 * Create a table. 1433 * @return A Table instance for the created table. 1434 */ 1435 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize) 1436 throws IOException { 1437 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1438 for (byte[] family : families) { 1439 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1440 .setMaxVersions(numVersions).setBlocksize(blockSize); 1441 if (isNewVersionBehaviorEnabled()) { 1442 cfBuilder.setNewVersionBehavior(true); 1443 } 1444 builder.setColumnFamily(cfBuilder.build()); 1445 } 1446 getAdmin().createTable(builder.build()); 1447 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1448 // assigned 1449 waitUntilAllRegionsAssigned(tableName); 1450 return getConnection().getTable(tableName); 1451 } 1452 1453 public Table createTable(TableName tableName, byte[][] families, int numVersions, int blockSize, 1454 String cpName) throws IOException { 1455 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1456 for (byte[] family : families) { 1457 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family) 1458 .setMaxVersions(numVersions).setBlocksize(blockSize); 1459 if (isNewVersionBehaviorEnabled()) { 1460 cfBuilder.setNewVersionBehavior(true); 1461 } 1462 builder.setColumnFamily(cfBuilder.build()); 1463 } 1464 if (cpName != null) { 1465 builder.setCoprocessor(cpName); 1466 } 1467 getAdmin().createTable(builder.build()); 1468 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1469 // assigned 1470 waitUntilAllRegionsAssigned(tableName); 1471 return getConnection().getTable(tableName); 1472 } 1473 1474 /** 1475 * Create a table. 1476 * @return A Table instance for the created table. 1477 */ 1478 public Table createTable(TableName tableName, byte[][] families, int[] numVersions) 1479 throws IOException { 1480 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1481 int i = 0; 1482 for (byte[] family : families) { 1483 ColumnFamilyDescriptorBuilder cfBuilder = 1484 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(numVersions[i]); 1485 if (isNewVersionBehaviorEnabled()) { 1486 cfBuilder.setNewVersionBehavior(true); 1487 } 1488 builder.setColumnFamily(cfBuilder.build()); 1489 i++; 1490 } 1491 getAdmin().createTable(builder.build()); 1492 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1493 // assigned 1494 waitUntilAllRegionsAssigned(tableName); 1495 return getConnection().getTable(tableName); 1496 } 1497 1498 /** 1499 * Create a table. 1500 * @return A Table instance for the created table. 1501 */ 1502 public Table createTable(TableName tableName, byte[] family, byte[][] splitRows) 1503 throws IOException { 1504 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1505 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1506 if (isNewVersionBehaviorEnabled()) { 1507 cfBuilder.setNewVersionBehavior(true); 1508 } 1509 builder.setColumnFamily(cfBuilder.build()); 1510 getAdmin().createTable(builder.build(), splitRows); 1511 // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are 1512 // assigned 1513 waitUntilAllRegionsAssigned(tableName); 1514 return getConnection().getTable(tableName); 1515 } 1516 1517 /** 1518 * Create a table with multiple regions. 1519 * @return A Table instance for the created table. 1520 */ 1521 public Table createMultiRegionTable(TableName tableName, byte[] family) throws IOException { 1522 return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE); 1523 } 1524 1525 /** 1526 * Set the number of Region replicas. 1527 */ 1528 public static void setReplicas(Admin admin, TableName table, int replicaCount) 1529 throws IOException, InterruptedException { 1530 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table)) 1531 .setRegionReplication(replicaCount).build(); 1532 admin.modifyTable(desc); 1533 } 1534 1535 /** 1536 * Set the number of Region replicas. 1537 */ 1538 public static void setReplicas(AsyncAdmin admin, TableName table, int replicaCount) 1539 throws ExecutionException, IOException, InterruptedException { 1540 TableDescriptor desc = TableDescriptorBuilder.newBuilder(admin.getDescriptor(table).get()) 1541 .setRegionReplication(replicaCount).build(); 1542 admin.modifyTable(desc).get(); 1543 } 1544 1545 /** 1546 * Drop an existing table 1547 * @param tableName existing table 1548 */ 1549 public void deleteTable(TableName tableName) throws IOException { 1550 try { 1551 getAdmin().disableTable(tableName); 1552 } catch (TableNotEnabledException e) { 1553 LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); 1554 } 1555 getAdmin().deleteTable(tableName); 1556 } 1557 1558 /** 1559 * Drop an existing table 1560 * @param tableName existing table 1561 */ 1562 public void deleteTableIfAny(TableName tableName) throws IOException { 1563 try { 1564 deleteTable(tableName); 1565 } catch (TableNotFoundException e) { 1566 // ignore 1567 } 1568 } 1569 1570 // ========================================================================== 1571 // Canned table and table descriptor creation 1572 1573 public final static byte[] fam1 = Bytes.toBytes("colfamily11"); 1574 public final static byte[] fam2 = Bytes.toBytes("colfamily21"); 1575 public final static byte[] fam3 = Bytes.toBytes("colfamily31"); 1576 public static final byte[][] COLUMNS = { fam1, fam2, fam3 }; 1577 private static final int MAXVERSIONS = 3; 1578 1579 public static final char FIRST_CHAR = 'a'; 1580 public static final char LAST_CHAR = 'z'; 1581 public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR }; 1582 public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET); 1583 1584 public TableDescriptorBuilder createModifyableTableDescriptor(final String name) { 1585 return createModifyableTableDescriptor(TableName.valueOf(name), 1586 ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER, 1587 ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1588 } 1589 1590 public TableDescriptor createTableDescriptor(final TableName name, final int minVersions, 1591 final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1592 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1593 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1594 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1595 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1596 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1597 if (isNewVersionBehaviorEnabled()) { 1598 cfBuilder.setNewVersionBehavior(true); 1599 } 1600 builder.setColumnFamily(cfBuilder.build()); 1601 } 1602 return builder.build(); 1603 } 1604 1605 public TableDescriptorBuilder createModifyableTableDescriptor(final TableName name, 1606 final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) { 1607 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 1608 for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) { 1609 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName) 1610 .setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted) 1611 .setBlockCacheEnabled(false).setTimeToLive(ttl); 1612 if (isNewVersionBehaviorEnabled()) { 1613 cfBuilder.setNewVersionBehavior(true); 1614 } 1615 builder.setColumnFamily(cfBuilder.build()); 1616 } 1617 return builder; 1618 } 1619 1620 /** 1621 * Create a table of name <code>name</code>. 1622 * @param name Name to give table. 1623 * @return Column descriptor. 1624 */ 1625 public TableDescriptor createTableDescriptor(final TableName name) { 1626 return createTableDescriptor(name, ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 1627 MAXVERSIONS, HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED); 1628 } 1629 1630 public TableDescriptor createTableDescriptor(final TableName tableName, byte[] family) { 1631 return createTableDescriptor(tableName, new byte[][] { family }, 1); 1632 } 1633 1634 public TableDescriptor createTableDescriptor(final TableName tableName, byte[][] families, 1635 int maxVersions) { 1636 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1637 for (byte[] family : families) { 1638 ColumnFamilyDescriptorBuilder cfBuilder = 1639 ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersions); 1640 if (isNewVersionBehaviorEnabled()) { 1641 cfBuilder.setNewVersionBehavior(true); 1642 } 1643 builder.setColumnFamily(cfBuilder.build()); 1644 } 1645 return builder.build(); 1646 } 1647 1648 /** 1649 * Create an HRegion that writes to the local tmp dirs 1650 * @param desc a table descriptor indicating which table the region belongs to 1651 * @param startKey the start boundary of the region 1652 * @param endKey the end boundary of the region 1653 * @return a region that writes to local dir for testing 1654 */ 1655 public HRegion createLocalHRegion(TableDescriptor desc, byte[] startKey, byte[] endKey) 1656 throws IOException { 1657 RegionInfo hri = RegionInfoBuilder.newBuilder(desc.getTableName()).setStartKey(startKey) 1658 .setEndKey(endKey).build(); 1659 return createLocalHRegion(hri, desc); 1660 } 1661 1662 /** 1663 * Create an HRegion that writes to the local tmp dirs. Creates the WAL for you. Be sure to call 1664 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} when you're finished with it. 1665 */ 1666 public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws IOException { 1667 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), desc); 1668 } 1669 1670 /** 1671 * Create an HRegion that writes to the local tmp dirs with specified wal 1672 * @param info regioninfo 1673 * @param conf configuration 1674 * @param desc table descriptor 1675 * @param wal wal for this region. 1676 * @return created hregion 1677 */ 1678 public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc, 1679 WAL wal) throws IOException { 1680 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 1681 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 1682 return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal); 1683 } 1684 1685 /** 1686 * @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} 1687 * when done. 1688 */ 1689 public HRegion createLocalHRegion(TableName tableName, byte[] startKey, byte[] stopKey, 1690 Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families) 1691 throws IOException { 1692 return createLocalHRegionWithInMemoryFlags(tableName, startKey, stopKey, conf, isReadOnly, 1693 durability, wal, null, families); 1694 } 1695 1696 public HRegion createLocalHRegionWithInMemoryFlags(TableName tableName, byte[] startKey, 1697 byte[] stopKey, Configuration conf, boolean isReadOnly, Durability durability, WAL wal, 1698 boolean[] compactedMemStore, byte[]... families) throws IOException { 1699 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 1700 builder.setReadOnly(isReadOnly); 1701 int i = 0; 1702 for (byte[] family : families) { 1703 ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family); 1704 if (compactedMemStore != null && i < compactedMemStore.length) { 1705 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.BASIC); 1706 } else { 1707 cfBuilder.setInMemoryCompaction(MemoryCompactionPolicy.NONE); 1708 1709 } 1710 i++; 1711 // Set default to be three versions. 1712 cfBuilder.setMaxVersions(Integer.MAX_VALUE); 1713 builder.setColumnFamily(cfBuilder.build()); 1714 } 1715 builder.setDurability(durability); 1716 RegionInfo info = 1717 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build(); 1718 return createLocalHRegion(info, conf, builder.build(), wal); 1719 } 1720 1721 // 1722 // ========================================================================== 1723 1724 /** 1725 * Provide an existing table name to truncate. Scans the table and issues a delete for each row 1726 * read. 1727 * @param tableName existing table 1728 * @return HTable to that new table 1729 */ 1730 public Table deleteTableData(TableName tableName) throws IOException { 1731 Table table = getConnection().getTable(tableName); 1732 Scan scan = new Scan(); 1733 ResultScanner resScan = table.getScanner(scan); 1734 for (Result res : resScan) { 1735 Delete del = new Delete(res.getRow()); 1736 table.delete(del); 1737 } 1738 resScan = table.getScanner(scan); 1739 resScan.close(); 1740 return table; 1741 } 1742 1743 /** 1744 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1745 * table. 1746 * @param tableName table which must exist. 1747 * @param preserveRegions keep the existing split points 1748 * @return HTable for the new table 1749 */ 1750 public Table truncateTable(final TableName tableName, final boolean preserveRegions) 1751 throws IOException { 1752 Admin admin = getAdmin(); 1753 if (!admin.isTableDisabled(tableName)) { 1754 admin.disableTable(tableName); 1755 } 1756 admin.truncateTable(tableName, preserveRegions); 1757 return getConnection().getTable(tableName); 1758 } 1759 1760 /** 1761 * Truncate a table using the admin command. Effectively disables, deletes, and recreates the 1762 * table. For previous behavior of issuing row deletes, see deleteTableData. Expressly does not 1763 * preserve regions of existing table. 1764 * @param tableName table which must exist. 1765 * @return HTable for the new table 1766 */ 1767 public Table truncateTable(final TableName tableName) throws IOException { 1768 return truncateTable(tableName, false); 1769 } 1770 1771 /** 1772 * Load table with rows from 'aaa' to 'zzz'. 1773 * @param t Table 1774 * @param f Family 1775 * @return Count of rows loaded. 1776 */ 1777 public int loadTable(final Table t, final byte[] f) throws IOException { 1778 return loadTable(t, new byte[][] { f }); 1779 } 1780 1781 /** 1782 * Load table with rows from 'aaa' to 'zzz'. 1783 * @param t Table 1784 * @param f Family 1785 * @return Count of rows loaded. 1786 */ 1787 public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException { 1788 return loadTable(t, new byte[][] { f }, null, writeToWAL); 1789 } 1790 1791 /** 1792 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1793 * @param t Table 1794 * @param f Array of Families to load 1795 * @return Count of rows loaded. 1796 */ 1797 public int loadTable(final Table t, final byte[][] f) throws IOException { 1798 return loadTable(t, f, null); 1799 } 1800 1801 /** 1802 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1803 * @param t Table 1804 * @param f Array of Families to load 1805 * @param value the values of the cells. If null is passed, the row key is used as value 1806 * @return Count of rows loaded. 1807 */ 1808 public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException { 1809 return loadTable(t, f, value, true); 1810 } 1811 1812 /** 1813 * Load table of multiple column families with rows from 'aaa' to 'zzz'. 1814 * @param t Table 1815 * @param f Array of Families to load 1816 * @param value the values of the cells. If null is passed, the row key is used as value 1817 * @return Count of rows loaded. 1818 */ 1819 public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) 1820 throws IOException { 1821 List<Put> puts = new ArrayList<>(); 1822 for (byte[] row : HBaseTestingUtil.ROWS) { 1823 Put put = new Put(row); 1824 put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); 1825 for (int i = 0; i < f.length; i++) { 1826 byte[] value1 = value != null ? value : row; 1827 put.addColumn(f[i], f[i], value1); 1828 } 1829 puts.add(put); 1830 } 1831 t.put(puts); 1832 return puts.size(); 1833 } 1834 1835 /** 1836 * A tracker for tracking and validating table rows generated with 1837 * {@link HBaseTestingUtil#loadTable(Table, byte[])} 1838 */ 1839 public static class SeenRowTracker { 1840 int dim = 'z' - 'a' + 1; 1841 int[][][] seenRows = new int[dim][dim][dim]; // count of how many times the row is seen 1842 byte[] startRow; 1843 byte[] stopRow; 1844 1845 public SeenRowTracker(byte[] startRow, byte[] stopRow) { 1846 this.startRow = startRow; 1847 this.stopRow = stopRow; 1848 } 1849 1850 void reset() { 1851 for (byte[] row : ROWS) { 1852 seenRows[i(row[0])][i(row[1])][i(row[2])] = 0; 1853 } 1854 } 1855 1856 int i(byte b) { 1857 return b - 'a'; 1858 } 1859 1860 public void addRow(byte[] row) { 1861 seenRows[i(row[0])][i(row[1])][i(row[2])]++; 1862 } 1863 1864 /** 1865 * Validate that all the rows between startRow and stopRow are seen exactly once, and all other 1866 * rows none 1867 */ 1868 public void validate() { 1869 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1870 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1871 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1872 int count = seenRows[i(b1)][i(b2)][i(b3)]; 1873 int expectedCount = 0; 1874 if ( 1875 Bytes.compareTo(new byte[] { b1, b2, b3 }, startRow) >= 0 1876 && Bytes.compareTo(new byte[] { b1, b2, b3 }, stopRow) < 0 1877 ) { 1878 expectedCount = 1; 1879 } 1880 if (count != expectedCount) { 1881 String row = new String(new byte[] { b1, b2, b3 }, StandardCharsets.UTF_8); 1882 throw new RuntimeException("Row:" + row + " has a seen count of " + count + " " 1883 + "instead of " + expectedCount); 1884 } 1885 } 1886 } 1887 } 1888 } 1889 } 1890 1891 public int loadRegion(final HRegion r, final byte[] f) throws IOException { 1892 return loadRegion(r, f, false); 1893 } 1894 1895 public int loadRegion(final Region r, final byte[] f) throws IOException { 1896 return loadRegion((HRegion) r, f); 1897 } 1898 1899 /** 1900 * Load region with rows from 'aaa' to 'zzz'. 1901 * @param r Region 1902 * @param f Family 1903 * @param flush flush the cache if true 1904 * @return Count of rows loaded. 1905 */ 1906 public int loadRegion(final HRegion r, final byte[] f, final boolean flush) throws IOException { 1907 byte[] k = new byte[3]; 1908 int rowCount = 0; 1909 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 1910 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 1911 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 1912 k[0] = b1; 1913 k[1] = b2; 1914 k[2] = b3; 1915 Put put = new Put(k); 1916 put.setDurability(Durability.SKIP_WAL); 1917 put.addColumn(f, null, k); 1918 if (r.getWAL() == null) { 1919 put.setDurability(Durability.SKIP_WAL); 1920 } 1921 int preRowCount = rowCount; 1922 int pause = 10; 1923 int maxPause = 1000; 1924 while (rowCount == preRowCount) { 1925 try { 1926 r.put(put); 1927 rowCount++; 1928 } catch (RegionTooBusyException e) { 1929 pause = (pause * 2 >= maxPause) ? maxPause : pause * 2; 1930 Threads.sleep(pause); 1931 } 1932 } 1933 } 1934 } 1935 if (flush) { 1936 r.flush(true); 1937 } 1938 } 1939 return rowCount; 1940 } 1941 1942 public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow) 1943 throws IOException { 1944 for (int i = startRow; i < endRow; i++) { 1945 byte[] data = Bytes.toBytes(String.valueOf(i)); 1946 Put put = new Put(data); 1947 put.addColumn(f, null, data); 1948 t.put(put); 1949 } 1950 } 1951 1952 public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) 1953 throws IOException { 1954 for (int i = 0; i < totalRows; i++) { 1955 byte[] row = new byte[rowSize]; 1956 Bytes.random(row); 1957 Put put = new Put(row); 1958 put.addColumn(f, new byte[] { 0 }, new byte[] { 0 }); 1959 t.put(put); 1960 } 1961 } 1962 1963 public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, 1964 int replicaId) throws IOException { 1965 for (int i = startRow; i < endRow; i++) { 1966 String failMsg = "Failed verification of row :" + i; 1967 byte[] data = Bytes.toBytes(String.valueOf(i)); 1968 Get get = new Get(data); 1969 get.setReplicaId(replicaId); 1970 get.setConsistency(Consistency.TIMELINE); 1971 Result result = table.get(get); 1972 assertTrue(failMsg, result.containsColumn(f, null)); 1973 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 1974 Cell cell = result.getColumnLatestCell(f, null); 1975 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 1976 cell.getValueOffset(), cell.getValueLength())); 1977 } 1978 } 1979 1980 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow) 1981 throws IOException { 1982 verifyNumericRows((HRegion) region, f, startRow, endRow); 1983 } 1984 1985 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow) 1986 throws IOException { 1987 verifyNumericRows(region, f, startRow, endRow, true); 1988 } 1989 1990 public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow, 1991 final boolean present) throws IOException { 1992 verifyNumericRows((HRegion) region, f, startRow, endRow, present); 1993 } 1994 1995 public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow, 1996 final boolean present) throws IOException { 1997 for (int i = startRow; i < endRow; i++) { 1998 String failMsg = "Failed verification of row :" + i; 1999 byte[] data = Bytes.toBytes(String.valueOf(i)); 2000 Result result = region.get(new Get(data)); 2001 2002 boolean hasResult = result != null && !result.isEmpty(); 2003 assertEquals(failMsg + result, present, hasResult); 2004 if (!present) continue; 2005 2006 assertTrue(failMsg, result.containsColumn(f, null)); 2007 assertEquals(failMsg, 1, result.getColumnCells(f, null).size()); 2008 Cell cell = result.getColumnLatestCell(f, null); 2009 assertTrue(failMsg, Bytes.equals(data, 0, data.length, cell.getValueArray(), 2010 cell.getValueOffset(), cell.getValueLength())); 2011 } 2012 } 2013 2014 public void deleteNumericRows(final Table t, final byte[] f, int startRow, int endRow) 2015 throws IOException { 2016 for (int i = startRow; i < endRow; i++) { 2017 byte[] data = Bytes.toBytes(String.valueOf(i)); 2018 Delete delete = new Delete(data); 2019 delete.addFamily(f); 2020 t.delete(delete); 2021 } 2022 } 2023 2024 /** 2025 * Return the number of rows in the given table. 2026 * @param table to count rows 2027 * @return count of rows 2028 */ 2029 public static int countRows(final Table table) throws IOException { 2030 return countRows(table, new Scan()); 2031 } 2032 2033 public static int countRows(final Table table, final Scan scan) throws IOException { 2034 try (ResultScanner results = table.getScanner(scan)) { 2035 int count = 0; 2036 while (results.next() != null) { 2037 count++; 2038 } 2039 return count; 2040 } 2041 } 2042 2043 public static int countRows(final Table table, final byte[]... families) throws IOException { 2044 Scan scan = new Scan(); 2045 for (byte[] family : families) { 2046 scan.addFamily(family); 2047 } 2048 return countRows(table, scan); 2049 } 2050 2051 /** 2052 * Return the number of rows in the given table. 2053 */ 2054 public int countRows(final TableName tableName) throws IOException { 2055 try (Table table = getConnection().getTable(tableName)) { 2056 return countRows(table); 2057 } 2058 } 2059 2060 public static int countRows(final Region region) throws IOException { 2061 return countRows(region, new Scan()); 2062 } 2063 2064 public static int countRows(final Region region, final Scan scan) throws IOException { 2065 try (InternalScanner scanner = region.getScanner(scan)) { 2066 return countRows(scanner); 2067 } 2068 } 2069 2070 public static int countRows(final InternalScanner scanner) throws IOException { 2071 int scannedCount = 0; 2072 List<Cell> results = new ArrayList<>(); 2073 boolean hasMore = true; 2074 while (hasMore) { 2075 hasMore = scanner.next(results); 2076 scannedCount += results.size(); 2077 results.clear(); 2078 } 2079 return scannedCount; 2080 } 2081 2082 /** 2083 * Return an md5 digest of the entire contents of a table. 2084 */ 2085 public String checksumRows(final Table table) throws Exception { 2086 MessageDigest digest = MessageDigest.getInstance("MD5"); 2087 try (ResultScanner results = table.getScanner(new Scan())) { 2088 for (Result res : results) { 2089 digest.update(res.getRow()); 2090 } 2091 } 2092 return digest.toString(); 2093 } 2094 2095 /** All the row values for the data loaded by {@link #loadTable(Table, byte[])} */ 2096 public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB 2097 static { 2098 int i = 0; 2099 for (byte b1 = 'a'; b1 <= 'z'; b1++) { 2100 for (byte b2 = 'a'; b2 <= 'z'; b2++) { 2101 for (byte b3 = 'a'; b3 <= 'z'; b3++) { 2102 ROWS[i][0] = b1; 2103 ROWS[i][1] = b2; 2104 ROWS[i][2] = b3; 2105 i++; 2106 } 2107 } 2108 } 2109 } 2110 2111 public static final byte[][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), 2112 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2113 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2114 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2115 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2116 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2117 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy") }; 2118 2119 public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = { Bytes.toBytes("bbb"), 2120 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), 2121 Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("jjj"), 2122 Bytes.toBytes("kkk"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), 2123 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), 2124 Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 2125 Bytes.toBytes("www"), Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz") }; 2126 2127 /** 2128 * Create rows in hbase:meta for regions of the specified table with the specified start keys. The 2129 * first startKey should be a 0 length byte array if you want to form a proper range of regions. 2130 * @return list of region info for regions added to meta 2131 */ 2132 public List<RegionInfo> createMultiRegionsInMeta(final Configuration conf, 2133 final TableDescriptor htd, byte[][] startKeys) throws IOException { 2134 try (Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 2135 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); 2136 List<RegionInfo> newRegions = new ArrayList<>(startKeys.length); 2137 MetaTableAccessor.updateTableState(getConnection(), htd.getTableName(), 2138 TableState.State.ENABLED); 2139 // add custom ones 2140 for (int i = 0; i < startKeys.length; i++) { 2141 int j = (i + 1) % startKeys.length; 2142 RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKeys[i]) 2143 .setEndKey(startKeys[j]).build(); 2144 MetaTableAccessor.addRegionsToMeta(getConnection(), Collections.singletonList(hri), 1); 2145 newRegions.add(hri); 2146 } 2147 return newRegions; 2148 } 2149 } 2150 2151 /** 2152 * Create an unmanaged WAL. Be sure to close it when you're through. 2153 */ 2154 public static WAL createWal(final Configuration conf, final Path rootDir, final RegionInfo hri) 2155 throws IOException { 2156 // The WAL subsystem will use the default rootDir rather than the passed in rootDir 2157 // unless I pass along via the conf. 2158 Configuration confForWAL = new Configuration(conf); 2159 confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); 2160 return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri); 2161 } 2162 2163 /** 2164 * Create a region with it's own WAL. Be sure to call 2165 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2166 */ 2167 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2168 final Configuration conf, final TableDescriptor htd) throws IOException { 2169 return createRegionAndWAL(info, rootDir, conf, htd, true); 2170 } 2171 2172 /** 2173 * Create a region with it's own WAL. Be sure to call 2174 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2175 */ 2176 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2177 final Configuration conf, final TableDescriptor htd, BlockCache blockCache) throws IOException { 2178 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2179 region.setBlockCache(blockCache); 2180 region.initialize(); 2181 return region; 2182 } 2183 2184 /** 2185 * Create a region with it's own WAL. Be sure to call 2186 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2187 */ 2188 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2189 final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache) 2190 throws IOException { 2191 HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false); 2192 region.setMobFileCache(mobFileCache); 2193 region.initialize(); 2194 return region; 2195 } 2196 2197 /** 2198 * Create a region with it's own WAL. Be sure to call 2199 * {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)} to clean up all resources. 2200 */ 2201 public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir, 2202 final Configuration conf, final TableDescriptor htd, boolean initialize) throws IOException { 2203 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 2204 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 2205 WAL wal = createWal(conf, rootDir, info); 2206 return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); 2207 } 2208 2209 /** 2210 * Find any other region server which is different from the one identified by parameter 2211 * @return another region server 2212 */ 2213 public HRegionServer getOtherRegionServer(HRegionServer rs) { 2214 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2215 if (!(rst.getRegionServer() == rs)) { 2216 return rst.getRegionServer(); 2217 } 2218 } 2219 return null; 2220 } 2221 2222 /** 2223 * Tool to get the reference to the region server object that holds the region of the specified 2224 * user table. 2225 * @param tableName user table to lookup in hbase:meta 2226 * @return region server that holds it, null if the row doesn't exist 2227 */ 2228 public HRegionServer getRSForFirstRegionInTable(TableName tableName) 2229 throws IOException, InterruptedException { 2230 List<RegionInfo> regions = getAdmin().getRegions(tableName); 2231 if (regions == null || regions.isEmpty()) { 2232 return null; 2233 } 2234 LOG.debug("Found " + regions.size() + " regions for table " + tableName); 2235 2236 byte[] firstRegionName = 2237 regions.stream().filter(r -> !r.isOffline()).map(RegionInfo::getRegionName).findFirst() 2238 .orElseThrow(() -> new IOException("online regions not found in table " + tableName)); 2239 2240 LOG.debug("firstRegionName=" + Bytes.toString(firstRegionName)); 2241 long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, 2242 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 2243 int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2244 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 2245 RetryCounter retrier = new RetryCounter(numRetries + 1, (int) pause, TimeUnit.MICROSECONDS); 2246 while (retrier.shouldRetry()) { 2247 int index = getMiniHBaseCluster().getServerWith(firstRegionName); 2248 if (index != -1) { 2249 return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer(); 2250 } 2251 // Came back -1. Region may not be online yet. Sleep a while. 2252 retrier.sleepUntilNextRetry(); 2253 } 2254 return null; 2255 } 2256 2257 /** 2258 * Starts a <code>MiniMRCluster</code> with a default number of <code>TaskTracker</code>'s. 2259 * @throws IOException When starting the cluster fails. 2260 */ 2261 public MiniMRCluster startMiniMapReduceCluster() throws IOException { 2262 // Set a very high max-disk-utilization percentage to avoid the NodeManagers from failing. 2263 conf.setIfUnset("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", 2264 "99.0"); 2265 startMiniMapReduceCluster(2); 2266 return mrCluster; 2267 } 2268 2269 /** 2270 * Tasktracker has a bug where changing the hadoop.log.dir system property will not change its 2271 * internal static LOG_DIR variable. 2272 */ 2273 private void forceChangeTaskLogDir() { 2274 Field logDirField; 2275 try { 2276 logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); 2277 logDirField.setAccessible(true); 2278 2279 Field modifiersField = ReflectionUtils.getModifiersField(); 2280 modifiersField.setAccessible(true); 2281 modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); 2282 2283 logDirField.set(null, new File(hadoopLogDir, "userlogs")); 2284 } catch (SecurityException e) { 2285 throw new RuntimeException(e); 2286 } catch (NoSuchFieldException e) { 2287 throw new RuntimeException(e); 2288 } catch (IllegalArgumentException e) { 2289 throw new RuntimeException(e); 2290 } catch (IllegalAccessException e) { 2291 throw new RuntimeException(e); 2292 } 2293 } 2294 2295 /** 2296 * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different 2297 * filesystem. 2298 * @param servers The number of <code>TaskTracker</code>'s to start. 2299 * @throws IOException When starting the cluster fails. 2300 */ 2301 private void startMiniMapReduceCluster(final int servers) throws IOException { 2302 if (mrCluster != null) { 2303 throw new IllegalStateException("MiniMRCluster is already running"); 2304 } 2305 LOG.info("Starting mini mapreduce cluster..."); 2306 setupClusterTestDir(); 2307 createDirsAndSetProperties(); 2308 2309 forceChangeTaskLogDir(); 2310 2311 //// hadoop2 specific settings 2312 // Tests were failing because this process used 6GB of virtual memory and was getting killed. 2313 // we up the VM usable so that processes don't get killed. 2314 conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f); 2315 2316 // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and 2317 // this avoids the problem by disabling speculative task execution in tests. 2318 conf.setBoolean("mapreduce.map.speculative", false); 2319 conf.setBoolean("mapreduce.reduce.speculative", false); 2320 //// 2321 2322 // Yarn container runs in independent JVM. We need to pass the argument manually here if the 2323 // JDK version >= 17. Otherwise, the MiniMRCluster will fail. 2324 if (JVM.getJVMSpecVersion() >= 17) { 2325 String jvmOpts = conf.get("yarn.app.mapreduce.am.command-opts", ""); 2326 conf.set("yarn.app.mapreduce.am.command-opts", 2327 jvmOpts + " --add-opens java.base/java.lang=ALL-UNNAMED"); 2328 } 2329 2330 // Allow the user to override FS URI for this map-reduce cluster to use. 2331 mrCluster = 2332 new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 2333 1, null, null, new JobConf(this.conf)); 2334 JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); 2335 if (jobConf == null) { 2336 jobConf = mrCluster.createJobConf(); 2337 } 2338 2339 // Hadoop MiniMR overwrites this while it should not 2340 jobConf.set("mapreduce.cluster.local.dir", conf.get("mapreduce.cluster.local.dir")); 2341 LOG.info("Mini mapreduce cluster started"); 2342 2343 // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings. 2344 // Our HBase MR jobs need several of these settings in order to properly run. So we copy the 2345 // necessary config properties here. YARN-129 required adding a few properties. 2346 conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address")); 2347 // this for mrv2 support; mr1 ignores this 2348 conf.set("mapreduce.framework.name", "yarn"); 2349 conf.setBoolean("yarn.is.minicluster", true); 2350 String rmAddress = jobConf.get("yarn.resourcemanager.address"); 2351 if (rmAddress != null) { 2352 conf.set("yarn.resourcemanager.address", rmAddress); 2353 } 2354 String historyAddress = jobConf.get("mapreduce.jobhistory.address"); 2355 if (historyAddress != null) { 2356 conf.set("mapreduce.jobhistory.address", historyAddress); 2357 } 2358 String schedulerAddress = jobConf.get("yarn.resourcemanager.scheduler.address"); 2359 if (schedulerAddress != null) { 2360 conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress); 2361 } 2362 String mrJobHistoryWebappAddress = jobConf.get("mapreduce.jobhistory.webapp.address"); 2363 if (mrJobHistoryWebappAddress != null) { 2364 conf.set("mapreduce.jobhistory.webapp.address", mrJobHistoryWebappAddress); 2365 } 2366 String yarnRMWebappAddress = jobConf.get("yarn.resourcemanager.webapp.address"); 2367 if (yarnRMWebappAddress != null) { 2368 conf.set("yarn.resourcemanager.webapp.address", yarnRMWebappAddress); 2369 } 2370 } 2371 2372 /** 2373 * Stops the previously started <code>MiniMRCluster</code>. 2374 */ 2375 public void shutdownMiniMapReduceCluster() { 2376 if (mrCluster != null) { 2377 LOG.info("Stopping mini mapreduce cluster..."); 2378 mrCluster.shutdown(); 2379 mrCluster = null; 2380 LOG.info("Mini mapreduce cluster stopped"); 2381 } 2382 // Restore configuration to point to local jobtracker 2383 conf.set("mapreduce.jobtracker.address", "local"); 2384 } 2385 2386 /** 2387 * Create a stubbed out RegionServerService, mainly for getting FS. 2388 */ 2389 public RegionServerServices createMockRegionServerService() throws IOException { 2390 return createMockRegionServerService((ServerName) null); 2391 } 2392 2393 /** 2394 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2395 * TestTokenAuthentication 2396 */ 2397 public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) 2398 throws IOException { 2399 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher()); 2400 rss.setFileSystem(getTestFileSystem()); 2401 rss.setRpcServer(rpc); 2402 return rss; 2403 } 2404 2405 /** 2406 * Create a stubbed out RegionServerService, mainly for getting FS. This version is used by 2407 * TestOpenRegionHandler 2408 */ 2409 public RegionServerServices createMockRegionServerService(ServerName name) throws IOException { 2410 final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name); 2411 rss.setFileSystem(getTestFileSystem()); 2412 return rss; 2413 } 2414 2415 /** 2416 * Expire the Master's session 2417 */ 2418 public void expireMasterSession() throws Exception { 2419 HMaster master = getMiniHBaseCluster().getMaster(); 2420 expireSession(master.getZooKeeper(), false); 2421 } 2422 2423 /** 2424 * Expire a region server's session 2425 * @param index which RS 2426 */ 2427 public void expireRegionServerSession(int index) throws Exception { 2428 HRegionServer rs = getMiniHBaseCluster().getRegionServer(index); 2429 expireSession(rs.getZooKeeper(), false); 2430 decrementMinRegionServerCount(); 2431 } 2432 2433 private void decrementMinRegionServerCount() { 2434 // decrement the count for this.conf, for newly spwaned master 2435 // this.hbaseCluster shares this configuration too 2436 decrementMinRegionServerCount(getConfiguration()); 2437 2438 // each master thread keeps a copy of configuration 2439 for (MasterThread master : getHBaseCluster().getMasterThreads()) { 2440 decrementMinRegionServerCount(master.getMaster().getConfiguration()); 2441 } 2442 } 2443 2444 private void decrementMinRegionServerCount(Configuration conf) { 2445 int currentCount = conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); 2446 if (currentCount != -1) { 2447 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, Math.max(currentCount - 1, 1)); 2448 } 2449 } 2450 2451 public void expireSession(ZKWatcher nodeZK) throws Exception { 2452 expireSession(nodeZK, false); 2453 } 2454 2455 /** 2456 * Expire a ZooKeeper session as recommended in ZooKeeper documentation 2457 * http://hbase.apache.org/book.html#trouble.zookeeper 2458 * <p/> 2459 * There are issues when doing this: 2460 * <ol> 2461 * <li>http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html</li> 2462 * <li>https://issues.apache.org/jira/browse/ZOOKEEPER-1105</li> 2463 * </ol> 2464 * @param nodeZK - the ZK watcher to expire 2465 * @param checkStatus - true to check if we can create a Table with the current configuration. 2466 */ 2467 public void expireSession(ZKWatcher nodeZK, boolean checkStatus) throws Exception { 2468 Configuration c = new Configuration(this.conf); 2469 String quorumServers = ZKConfig.getZKQuorumServersString(c); 2470 ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper(); 2471 byte[] password = zk.getSessionPasswd(); 2472 long sessionID = zk.getSessionId(); 2473 2474 // Expiry seems to be asynchronous (see comment from P. Hunt in [1]), 2475 // so we create a first watcher to be sure that the 2476 // event was sent. We expect that if our watcher receives the event 2477 // other watchers on the same machine will get is as well. 2478 // When we ask to close the connection, ZK does not close it before 2479 // we receive all the events, so don't have to capture the event, just 2480 // closing the connection should be enough. 2481 ZooKeeper monitor = new ZooKeeper(quorumServers, 1000, new org.apache.zookeeper.Watcher() { 2482 @Override 2483 public void process(WatchedEvent watchedEvent) { 2484 LOG.info("Monitor ZKW received event=" + watchedEvent); 2485 } 2486 }, sessionID, password); 2487 2488 // Making it expire 2489 ZooKeeper newZK = 2490 new ZooKeeper(quorumServers, 1000, EmptyWatcher.instance, sessionID, password); 2491 2492 // ensure that we have connection to the server before closing down, otherwise 2493 // the close session event will be eaten out before we start CONNECTING state 2494 long start = EnvironmentEdgeManager.currentTime(); 2495 while ( 2496 newZK.getState() != States.CONNECTED && EnvironmentEdgeManager.currentTime() - start < 1000 2497 ) { 2498 Thread.sleep(1); 2499 } 2500 newZK.close(); 2501 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID)); 2502 2503 // Now closing & waiting to be sure that the clients get it. 2504 monitor.close(); 2505 2506 if (checkStatus) { 2507 getConnection().getTable(TableName.META_TABLE_NAME).close(); 2508 } 2509 } 2510 2511 /** 2512 * Get the Mini HBase cluster. 2513 * @return hbase cluster 2514 * @see #getHBaseClusterInterface() 2515 */ 2516 public SingleProcessHBaseCluster getHBaseCluster() { 2517 return getMiniHBaseCluster(); 2518 } 2519 2520 /** 2521 * Returns the HBaseCluster instance. 2522 * <p> 2523 * Returned object can be any of the subclasses of HBaseCluster, and the tests referring this 2524 * should not assume that the cluster is a mini cluster or a distributed one. If the test only 2525 * works on a mini cluster, then specific method {@link #getMiniHBaseCluster()} can be used 2526 * instead w/o the need to type-cast. 2527 */ 2528 public HBaseClusterInterface getHBaseClusterInterface() { 2529 // implementation note: we should rename this method as #getHBaseCluster(), 2530 // but this would require refactoring 90+ calls. 2531 return hbaseCluster; 2532 } 2533 2534 /** 2535 * Resets the connections so that the next time getConnection() is called, a new connection is 2536 * created. This is needed in cases where the entire cluster / all the masters are shutdown and 2537 * the connection is not valid anymore. 2538 * <p/> 2539 * TODO: There should be a more coherent way of doing this. Unfortunately the way tests are 2540 * written, not all start() stop() calls go through this class. Most tests directly operate on the 2541 * underlying mini/local hbase cluster. That makes it difficult for this wrapper class to maintain 2542 * the connection state automatically. Cleaning this is a much bigger refactor. 2543 */ 2544 public void invalidateConnection() throws IOException { 2545 closeConnection(); 2546 // Update the master addresses if they changed. 2547 final String masterConfigBefore = conf.get(HConstants.MASTER_ADDRS_KEY); 2548 final String masterConfAfter = getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY); 2549 LOG.info("Invalidated connection. Updating master addresses before: {} after: {}", 2550 masterConfigBefore, masterConfAfter); 2551 conf.set(HConstants.MASTER_ADDRS_KEY, 2552 getMiniHBaseCluster().getConf().get(HConstants.MASTER_ADDRS_KEY)); 2553 } 2554 2555 /** 2556 * Get a shared Connection to the cluster. this method is thread safe. 2557 * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. 2558 */ 2559 public Connection getConnection() throws IOException { 2560 return getAsyncConnection().toConnection(); 2561 } 2562 2563 /** 2564 * Get a assigned Connection to the cluster. this method is thread safe. 2565 * @param user assigned user 2566 * @return A Connection with assigned user. 2567 */ 2568 public Connection getConnection(User user) throws IOException { 2569 return getAsyncConnection(user).toConnection(); 2570 } 2571 2572 /** 2573 * Get a shared AsyncClusterConnection to the cluster. this method is thread safe. 2574 * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown 2575 * of cluster. 2576 */ 2577 public AsyncClusterConnection getAsyncConnection() throws IOException { 2578 try { 2579 return asyncConnection.updateAndGet(connection -> { 2580 if (connection == null) { 2581 try { 2582 User user = UserProvider.instantiate(conf).getCurrent(); 2583 connection = getAsyncConnection(user); 2584 } catch (IOException ioe) { 2585 throw new UncheckedIOException("Failed to create connection", ioe); 2586 } 2587 } 2588 return connection; 2589 }); 2590 } catch (UncheckedIOException exception) { 2591 throw exception.getCause(); 2592 } 2593 } 2594 2595 /** 2596 * Get a assigned AsyncClusterConnection to the cluster. this method is thread safe. 2597 * @param user assigned user 2598 * @return An AsyncClusterConnection with assigned user. 2599 */ 2600 public AsyncClusterConnection getAsyncConnection(User user) throws IOException { 2601 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user); 2602 } 2603 2604 public void closeConnection() throws IOException { 2605 if (hbaseAdmin != null) { 2606 Closeables.close(hbaseAdmin, true); 2607 hbaseAdmin = null; 2608 } 2609 AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null); 2610 if (asyncConnection != null) { 2611 Closeables.close(asyncConnection, true); 2612 } 2613 } 2614 2615 /** 2616 * Returns an Admin instance which is shared between HBaseTestingUtility instance users. Closing 2617 * it has no effect, it will be closed automatically when the cluster shutdowns 2618 */ 2619 public Admin getAdmin() throws IOException { 2620 if (hbaseAdmin == null) { 2621 this.hbaseAdmin = getConnection().getAdmin(); 2622 } 2623 return hbaseAdmin; 2624 } 2625 2626 private Admin hbaseAdmin = null; 2627 2628 /** 2629 * Returns an {@link Hbck} instance. Needs be closed when done. 2630 */ 2631 public Hbck getHbck() throws IOException { 2632 return getConnection().getHbck(); 2633 } 2634 2635 /** 2636 * Unassign the named region. 2637 * @param regionName The region to unassign. 2638 */ 2639 public void unassignRegion(String regionName) throws IOException { 2640 unassignRegion(Bytes.toBytes(regionName)); 2641 } 2642 2643 /** 2644 * Unassign the named region. 2645 * @param regionName The region to unassign. 2646 */ 2647 public void unassignRegion(byte[] regionName) throws IOException { 2648 getAdmin().unassign(regionName); 2649 } 2650 2651 /** 2652 * Closes the region containing the given row. 2653 * @param row The row to find the containing region. 2654 * @param table The table to find the region. 2655 */ 2656 public void unassignRegionByRow(String row, RegionLocator table) throws IOException { 2657 unassignRegionByRow(Bytes.toBytes(row), table); 2658 } 2659 2660 /** 2661 * Closes the region containing the given row. 2662 * @param row The row to find the containing region. 2663 * @param table The table to find the region. 2664 */ 2665 public void unassignRegionByRow(byte[] row, RegionLocator table) throws IOException { 2666 HRegionLocation hrl = table.getRegionLocation(row); 2667 unassignRegion(hrl.getRegion().getRegionName()); 2668 } 2669 2670 /** 2671 * Retrieves a splittable region randomly from tableName 2672 * @param tableName name of table 2673 * @param maxAttempts maximum number of attempts, unlimited for value of -1 2674 * @return the HRegion chosen, null if none was found within limit of maxAttempts 2675 */ 2676 public HRegion getSplittableRegion(TableName tableName, int maxAttempts) { 2677 List<HRegion> regions = getHBaseCluster().getRegions(tableName); 2678 int regCount = regions.size(); 2679 Set<Integer> attempted = new HashSet<>(); 2680 int idx; 2681 int attempts = 0; 2682 do { 2683 regions = getHBaseCluster().getRegions(tableName); 2684 if (regCount != regions.size()) { 2685 // if there was region movement, clear attempted Set 2686 attempted.clear(); 2687 } 2688 regCount = regions.size(); 2689 // There are chances that before we get the region for the table from an RS the region may 2690 // be going for CLOSE. This may be because online schema change is enabled 2691 if (regCount > 0) { 2692 idx = ThreadLocalRandom.current().nextInt(regCount); 2693 // if we have just tried this region, there is no need to try again 2694 if (attempted.contains(idx)) { 2695 continue; 2696 } 2697 HRegion region = regions.get(idx); 2698 if (region.checkSplit().isPresent()) { 2699 return region; 2700 } 2701 attempted.add(idx); 2702 } 2703 attempts++; 2704 } while (maxAttempts == -1 || attempts < maxAttempts); 2705 return null; 2706 } 2707 2708 public MiniDFSCluster getDFSCluster() { 2709 return dfsCluster; 2710 } 2711 2712 public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException { 2713 setDFSCluster(cluster, true); 2714 } 2715 2716 /** 2717 * Set the MiniDFSCluster 2718 * @param cluster cluster to use 2719 * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before it 2720 * is set. 2721 * @throws IllegalStateException if the passed cluster is up when it is required to be down 2722 * @throws IOException if the FileSystem could not be set from the passed dfs cluster 2723 */ 2724 public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown) 2725 throws IllegalStateException, IOException { 2726 if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) { 2727 throw new IllegalStateException("DFSCluster is already running! Shut it down first."); 2728 } 2729 this.dfsCluster = cluster; 2730 this.setFs(); 2731 } 2732 2733 public FileSystem getTestFileSystem() throws IOException { 2734 return HFileSystem.get(conf); 2735 } 2736 2737 /** 2738 * Wait until all regions in a table have been assigned. Waits default timeout before giving up 2739 * (30 seconds). 2740 * @param table Table to wait on. 2741 */ 2742 public void waitTableAvailable(TableName table) throws InterruptedException, IOException { 2743 waitTableAvailable(table.getName(), 30000); 2744 } 2745 2746 public void waitTableAvailable(TableName table, long timeoutMillis) 2747 throws InterruptedException, IOException { 2748 waitFor(timeoutMillis, predicateTableAvailable(table)); 2749 } 2750 2751 /** 2752 * Wait until all regions in a table have been assigned 2753 * @param table Table to wait on. 2754 * @param timeoutMillis Timeout. 2755 */ 2756 public void waitTableAvailable(byte[] table, long timeoutMillis) 2757 throws InterruptedException, IOException { 2758 waitFor(timeoutMillis, predicateTableAvailable(TableName.valueOf(table))); 2759 } 2760 2761 public String explainTableAvailability(TableName tableName) throws IOException { 2762 StringBuilder msg = 2763 new StringBuilder(explainTableState(tableName, TableState.State.ENABLED)).append(", "); 2764 if (getHBaseCluster().getMaster().isAlive()) { 2765 Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() 2766 .getRegionStates().getRegionAssignments(); 2767 final List<Pair<RegionInfo, ServerName>> metaLocations = 2768 MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName); 2769 for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { 2770 RegionInfo hri = metaLocation.getFirst(); 2771 ServerName sn = metaLocation.getSecond(); 2772 if (!assignments.containsKey(hri)) { 2773 msg.append(", region ").append(hri) 2774 .append(" not assigned, but found in meta, it expected to be on ").append(sn); 2775 } else if (sn == null) { 2776 msg.append(", region ").append(hri).append(" assigned, but has no server in meta"); 2777 } else if (!sn.equals(assignments.get(hri))) { 2778 msg.append(", region ").append(hri) 2779 .append(" assigned, but has different servers in meta and AM ( ").append(sn) 2780 .append(" <> ").append(assignments.get(hri)); 2781 } 2782 } 2783 } 2784 return msg.toString(); 2785 } 2786 2787 public String explainTableState(final TableName table, TableState.State state) 2788 throws IOException { 2789 TableState tableState = MetaTableAccessor.getTableState(getConnection(), table); 2790 if (tableState == null) { 2791 return "TableState in META: No table state in META for table " + table 2792 + " last state in meta (including deleted is " + findLastTableState(table) + ")"; 2793 } else if (!tableState.inStates(state)) { 2794 return "TableState in META: Not " + state + " state, but " + tableState; 2795 } else { 2796 return "TableState in META: OK"; 2797 } 2798 } 2799 2800 @Nullable 2801 public TableState findLastTableState(final TableName table) throws IOException { 2802 final AtomicReference<TableState> lastTableState = new AtomicReference<>(null); 2803 ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 2804 @Override 2805 public boolean visit(Result r) throws IOException { 2806 if (!Arrays.equals(r.getRow(), table.getName())) { 2807 return false; 2808 } 2809 TableState state = CatalogFamilyFormat.getTableState(r); 2810 if (state != null) { 2811 lastTableState.set(state); 2812 } 2813 return true; 2814 } 2815 }; 2816 MetaTableAccessor.scanMeta(getConnection(), null, null, ClientMetaTableAccessor.QueryType.TABLE, 2817 Integer.MAX_VALUE, visitor); 2818 return lastTableState.get(); 2819 } 2820 2821 /** 2822 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2823 * have been all assigned. Will timeout after default period (30 seconds) Tolerates nonexistent 2824 * table. 2825 * @param table the table to wait on. 2826 * @throws InterruptedException if interrupted while waiting 2827 * @throws IOException if an IO problem is encountered 2828 */ 2829 public void waitTableEnabled(TableName table) throws InterruptedException, IOException { 2830 waitTableEnabled(table, 30000); 2831 } 2832 2833 /** 2834 * Waits for a table to be 'enabled'. Enabled means that table is set as 'enabled' and the regions 2835 * have been all assigned. 2836 * @see #waitTableEnabled(TableName, long) 2837 * @param table Table to wait on. 2838 * @param timeoutMillis Time to wait on it being marked enabled. 2839 */ 2840 public void waitTableEnabled(byte[] table, long timeoutMillis) 2841 throws InterruptedException, IOException { 2842 waitTableEnabled(TableName.valueOf(table), timeoutMillis); 2843 } 2844 2845 public void waitTableEnabled(TableName table, long timeoutMillis) throws IOException { 2846 waitFor(timeoutMillis, predicateTableEnabled(table)); 2847 } 2848 2849 /** 2850 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' Will timeout 2851 * after default period (30 seconds) 2852 * @param table Table to wait on. 2853 */ 2854 public void waitTableDisabled(byte[] table) throws InterruptedException, IOException { 2855 waitTableDisabled(table, 30000); 2856 } 2857 2858 public void waitTableDisabled(TableName table, long millisTimeout) 2859 throws InterruptedException, IOException { 2860 waitFor(millisTimeout, predicateTableDisabled(table)); 2861 } 2862 2863 /** 2864 * Waits for a table to be 'disabled'. Disabled means that table is set as 'disabled' 2865 * @param table Table to wait on. 2866 * @param timeoutMillis Time to wait on it being marked disabled. 2867 */ 2868 public void waitTableDisabled(byte[] table, long timeoutMillis) 2869 throws InterruptedException, IOException { 2870 waitTableDisabled(TableName.valueOf(table), timeoutMillis); 2871 } 2872 2873 /** 2874 * Make sure that at least the specified number of region servers are running 2875 * @param num minimum number of region servers that should be running 2876 * @return true if we started some servers 2877 */ 2878 public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { 2879 boolean startedServer = false; 2880 SingleProcessHBaseCluster hbaseCluster = getMiniHBaseCluster(); 2881 for (int i = hbaseCluster.getLiveRegionServerThreads().size(); i < num; ++i) { 2882 LOG.info("Started new server=" + hbaseCluster.startRegionServer()); 2883 startedServer = true; 2884 } 2885 2886 return startedServer; 2887 } 2888 2889 /** 2890 * Make sure that at least the specified number of region servers are running. We don't count the 2891 * ones that are currently stopping or are stopped. 2892 * @param num minimum number of region servers that should be running 2893 * @return true if we started some servers 2894 */ 2895 public boolean ensureSomeNonStoppedRegionServersAvailable(final int num) throws IOException { 2896 boolean startedServer = ensureSomeRegionServersAvailable(num); 2897 2898 int nonStoppedServers = 0; 2899 for (JVMClusterUtil.RegionServerThread rst : getMiniHBaseCluster().getRegionServerThreads()) { 2900 2901 HRegionServer hrs = rst.getRegionServer(); 2902 if (hrs.isStopping() || hrs.isStopped()) { 2903 LOG.info("A region server is stopped or stopping:" + hrs); 2904 } else { 2905 nonStoppedServers++; 2906 } 2907 } 2908 for (int i = nonStoppedServers; i < num; ++i) { 2909 LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer()); 2910 startedServer = true; 2911 } 2912 return startedServer; 2913 } 2914 2915 /** 2916 * This method clones the passed <code>c</code> configuration setting a new user into the clone. 2917 * Use it getting new instances of FileSystem. Only works for DistributedFileSystem w/o Kerberos. 2918 * @param c Initial configuration 2919 * @param differentiatingSuffix Suffix to differentiate this user from others. 2920 * @return A new configuration instance with a different user set into it. 2921 */ 2922 public static User getDifferentUser(final Configuration c, final String differentiatingSuffix) 2923 throws IOException { 2924 FileSystem currentfs = FileSystem.get(c); 2925 if (!(currentfs instanceof DistributedFileSystem) || User.isHBaseSecurityEnabled(c)) { 2926 return User.getCurrent(); 2927 } 2928 // Else distributed filesystem. Make a new instance per daemon. Below 2929 // code is taken from the AppendTestUtil over in hdfs. 2930 String username = User.getCurrent().getName() + differentiatingSuffix; 2931 User user = User.createUserForTesting(c, username, new String[] { "supergroup" }); 2932 return user; 2933 } 2934 2935 public static NavigableSet<String> getAllOnlineRegions(SingleProcessHBaseCluster cluster) 2936 throws IOException { 2937 NavigableSet<String> online = new TreeSet<>(); 2938 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 2939 try { 2940 for (RegionInfo region : ProtobufUtil 2941 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 2942 online.add(region.getRegionNameAsString()); 2943 } 2944 } catch (RegionServerStoppedException e) { 2945 // That's fine. 2946 } 2947 } 2948 return online; 2949 } 2950 2951 /** 2952 * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and makes tests 2953 * linger. Here is the exception you'll see: 2954 * 2955 * <pre> 2956 * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/wal.1276627923013 block 2957 * blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block 2958 * blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 2959 * failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... 2960 * </pre> 2961 * 2962 * @param stream A DFSClient.DFSOutputStream. 2963 */ 2964 public static void setMaxRecoveryErrorCount(final OutputStream stream, final int max) { 2965 try { 2966 Class<?>[] clazzes = DFSClient.class.getDeclaredClasses(); 2967 for (Class<?> clazz : clazzes) { 2968 String className = clazz.getSimpleName(); 2969 if (className.equals("DFSOutputStream")) { 2970 if (clazz.isInstance(stream)) { 2971 Field maxRecoveryErrorCountField = 2972 stream.getClass().getDeclaredField("maxRecoveryErrorCount"); 2973 maxRecoveryErrorCountField.setAccessible(true); 2974 maxRecoveryErrorCountField.setInt(stream, max); 2975 break; 2976 } 2977 } 2978 } 2979 } catch (Exception e) { 2980 LOG.info("Could not set max recovery field", e); 2981 } 2982 } 2983 2984 /** 2985 * Uses directly the assignment manager to assign the region. and waits until the specified region 2986 * has completed assignment. 2987 * @return true if the region is assigned false otherwise. 2988 */ 2989 public boolean assignRegion(final RegionInfo regionInfo) 2990 throws IOException, InterruptedException { 2991 final AssignmentManager am = getHBaseCluster().getMaster().getAssignmentManager(); 2992 am.assign(regionInfo); 2993 return AssignmentTestingUtil.waitForAssignment(am, regionInfo); 2994 } 2995 2996 /** 2997 * Move region to destination server and wait till region is completely moved and online 2998 * @param destRegion region to move 2999 * @param destServer destination server of the region 3000 */ 3001 public void moveRegionAndWait(RegionInfo destRegion, ServerName destServer) 3002 throws InterruptedException, IOException { 3003 HMaster master = getMiniHBaseCluster().getMaster(); 3004 // TODO: Here we start the move. The move can take a while. 3005 getAdmin().move(destRegion.getEncodedNameAsBytes(), destServer); 3006 while (true) { 3007 ServerName serverName = 3008 master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(destRegion); 3009 if (serverName != null && serverName.equals(destServer)) { 3010 assertRegionOnServer(destRegion, serverName, 2000); 3011 break; 3012 } 3013 Thread.sleep(10); 3014 } 3015 } 3016 3017 /** 3018 * Wait until all regions for a table in hbase:meta have a non-empty info:server, up to a 3019 * configuable timeout value (default is 60 seconds) This means all regions have been deployed, 3020 * master has been informed and updated hbase:meta with the regions deployed server. 3021 * @param tableName the table name 3022 */ 3023 public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException { 3024 waitUntilAllRegionsAssigned(tableName, 3025 this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000)); 3026 } 3027 3028 /** 3029 * Waith until all system table's regions get assigned 3030 */ 3031 public void waitUntilAllSystemRegionsAssigned() throws IOException { 3032 waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); 3033 } 3034 3035 /** 3036 * Wait until all regions for a table in hbase:meta have a non-empty info:server, or until 3037 * timeout. This means all regions have been deployed, master has been informed and updated 3038 * hbase:meta with the regions deployed server. 3039 * @param tableName the table name 3040 * @param timeout timeout, in milliseconds 3041 */ 3042 public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout) 3043 throws IOException { 3044 if (!TableName.isMetaTableName(tableName)) { 3045 try (final Table meta = getConnection().getTable(TableName.META_TABLE_NAME)) { 3046 LOG.debug("Waiting until all regions of table " + tableName + " get assigned. Timeout = " 3047 + timeout + "ms"); 3048 waitFor(timeout, 200, true, new ExplainingPredicate<IOException>() { 3049 @Override 3050 public String explainFailure() throws IOException { 3051 return explainTableAvailability(tableName); 3052 } 3053 3054 @Override 3055 public boolean evaluate() throws IOException { 3056 Scan scan = new Scan(); 3057 scan.addFamily(HConstants.CATALOG_FAMILY); 3058 boolean tableFound = false; 3059 try (ResultScanner s = meta.getScanner(scan)) { 3060 for (Result r; (r = s.next()) != null;) { 3061 byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); 3062 RegionInfo info = RegionInfo.parseFromOrNull(b); 3063 if (info != null && info.getTable().equals(tableName)) { 3064 // Get server hosting this region from catalog family. Return false if no server 3065 // hosting this region, or if the server hosting this region was recently killed 3066 // (for fault tolerance testing). 3067 tableFound = true; 3068 byte[] server = 3069 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); 3070 if (server == null) { 3071 return false; 3072 } else { 3073 byte[] startCode = 3074 r.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER); 3075 ServerName serverName = 3076 ServerName.valueOf(Bytes.toString(server).replaceFirst(":", ",") + "," 3077 + Bytes.toLong(startCode)); 3078 if ( 3079 !getHBaseClusterInterface().isDistributedCluster() 3080 && getHBaseCluster().isKilledRS(serverName) 3081 ) { 3082 return false; 3083 } 3084 } 3085 if (RegionStateStore.getRegionState(r, info) != RegionState.State.OPEN) { 3086 return false; 3087 } 3088 } 3089 } 3090 } 3091 if (!tableFound) { 3092 LOG.warn( 3093 "Didn't find the entries for table " + tableName + " in meta, already deleted?"); 3094 } 3095 return tableFound; 3096 } 3097 }); 3098 } 3099 } 3100 LOG.info("All regions for table " + tableName + " assigned to meta. Checking AM states."); 3101 // check from the master state if we are using a mini cluster 3102 if (!getHBaseClusterInterface().isDistributedCluster()) { 3103 // So, all regions are in the meta table but make sure master knows of the assignments before 3104 // returning -- sometimes this can lag. 3105 HMaster master = getHBaseCluster().getMaster(); 3106 final RegionStates states = master.getAssignmentManager().getRegionStates(); 3107 waitFor(timeout, 200, new ExplainingPredicate<IOException>() { 3108 @Override 3109 public String explainFailure() throws IOException { 3110 return explainTableAvailability(tableName); 3111 } 3112 3113 @Override 3114 public boolean evaluate() throws IOException { 3115 List<RegionInfo> hris = states.getRegionsOfTable(tableName); 3116 return hris != null && !hris.isEmpty(); 3117 } 3118 }); 3119 } 3120 LOG.info("All regions for table " + tableName + " assigned."); 3121 } 3122 3123 /** 3124 * Do a small get/scan against one store. This is required because store has no actual methods of 3125 * querying itself, and relies on StoreScanner. 3126 */ 3127 public static List<Cell> getFromStoreFile(HStore store, Get get) throws IOException { 3128 Scan scan = new Scan(get); 3129 InternalScanner scanner = (InternalScanner) store.getScanner(scan, 3130 scan.getFamilyMap().get(store.getColumnFamilyDescriptor().getName()), 3131 // originally MultiVersionConcurrencyControl.resetThreadReadPoint() was called to set 3132 // readpoint 0. 3133 0); 3134 3135 List<Cell> result = new ArrayList<>(); 3136 scanner.next(result); 3137 if (!result.isEmpty()) { 3138 // verify that we are on the row we want: 3139 Cell kv = result.get(0); 3140 if (!CellUtil.matchingRows(kv, get.getRow())) { 3141 result.clear(); 3142 } 3143 } 3144 scanner.close(); 3145 return result; 3146 } 3147 3148 /** 3149 * Create region split keys between startkey and endKey 3150 * @param numRegions the number of regions to be created. it has to be greater than 3. 3151 * @return resulting split keys 3152 */ 3153 public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions) { 3154 assertTrue(numRegions > 3); 3155 byte[][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3); 3156 byte[][] result = new byte[tmpSplitKeys.length + 1][]; 3157 System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length); 3158 result[0] = HConstants.EMPTY_BYTE_ARRAY; 3159 return result; 3160 } 3161 3162 /** 3163 * Do a small get/scan against one store. This is required because store has no actual methods of 3164 * querying itself, and relies on StoreScanner. 3165 */ 3166 public static List<Cell> getFromStoreFile(HStore store, byte[] row, NavigableSet<byte[]> columns) 3167 throws IOException { 3168 Get get = new Get(row); 3169 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); 3170 s.put(store.getColumnFamilyDescriptor().getName(), columns); 3171 3172 return getFromStoreFile(store, get); 3173 } 3174 3175 public static void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected, 3176 final List<? extends Cell> actual) { 3177 final int eLen = expected.size(); 3178 final int aLen = actual.size(); 3179 final int minLen = Math.min(eLen, aLen); 3180 3181 int i = 0; 3182 while ( 3183 i < minLen && CellComparator.getInstance().compare(expected.get(i), actual.get(i)) == 0 3184 ) { 3185 i++; 3186 } 3187 3188 if (additionalMsg == null) { 3189 additionalMsg = ""; 3190 } 3191 if (!additionalMsg.isEmpty()) { 3192 additionalMsg = ". " + additionalMsg; 3193 } 3194 3195 if (eLen != aLen || i != minLen) { 3196 throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": " 3197 + safeGetAsStr(expected, i) + " (length " + eLen + ") vs. " + safeGetAsStr(actual, i) 3198 + " (length " + aLen + ")" + additionalMsg); 3199 } 3200 } 3201 3202 public static <T> String safeGetAsStr(List<T> lst, int i) { 3203 if (0 <= i && i < lst.size()) { 3204 return lst.get(i).toString(); 3205 } else { 3206 return "<out_of_range>"; 3207 } 3208 } 3209 3210 public String getRpcConnnectionURI() throws UnknownHostException { 3211 return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); 3212 } 3213 3214 public String getZkConnectionURI() { 3215 return "hbase+zk://" + conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" 3216 + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3217 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3218 } 3219 3220 /** 3221 * Get the zk based cluster key for this cluster. 3222 * @deprecated since 2.7.0, will be removed in 4.0.0. Now we use connection uri to specify the 3223 * connection info of a cluster. Keep here only for compatibility. 3224 * @see #getRpcConnnectionURI() 3225 * @see #getZkConnectionURI() 3226 */ 3227 @Deprecated 3228 public String getClusterKey() { 3229 return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) 3230 + ":" 3231 + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); 3232 } 3233 3234 /** 3235 * Creates a random table with the given parameters 3236 */ 3237 public Table createRandomTable(TableName tableName, final Collection<String> families, 3238 final int maxVersions, final int numColsPerRow, final int numFlushes, final int numRegions, 3239 final int numRowsPerFlush) throws IOException, InterruptedException { 3240 LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + " regions, " 3241 + numFlushes + " storefiles per region, " + numRowsPerFlush + " rows per flush, maxVersions=" 3242 + maxVersions + "\n"); 3243 3244 final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); 3245 final int numCF = families.size(); 3246 final byte[][] cfBytes = new byte[numCF][]; 3247 { 3248 int cfIndex = 0; 3249 for (String cf : families) { 3250 cfBytes[cfIndex++] = Bytes.toBytes(cf); 3251 } 3252 } 3253 3254 final int actualStartKey = 0; 3255 final int actualEndKey = Integer.MAX_VALUE; 3256 final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; 3257 final int splitStartKey = actualStartKey + keysPerRegion; 3258 final int splitEndKey = actualEndKey - keysPerRegion; 3259 final String keyFormat = "%08x"; 3260 final Table table = createTable(tableName, cfBytes, maxVersions, 3261 Bytes.toBytes(String.format(keyFormat, splitStartKey)), 3262 Bytes.toBytes(String.format(keyFormat, splitEndKey)), numRegions); 3263 3264 if (hbaseCluster != null) { 3265 getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME); 3266 } 3267 3268 BufferedMutator mutator = getConnection().getBufferedMutator(tableName); 3269 3270 for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { 3271 for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { 3272 final byte[] row = Bytes.toBytes( 3273 String.format(keyFormat, actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); 3274 3275 Put put = new Put(row); 3276 Delete del = new Delete(row); 3277 for (int iCol = 0; iCol < numColsPerRow; ++iCol) { 3278 final byte[] cf = cfBytes[rand.nextInt(numCF)]; 3279 final long ts = rand.nextInt(); 3280 final byte[] qual = Bytes.toBytes("col" + iCol); 3281 if (rand.nextBoolean()) { 3282 final byte[] value = 3283 Bytes.toBytes("value_for_row_" + iRow + "_cf_" + Bytes.toStringBinary(cf) + "_col_" 3284 + iCol + "_ts_" + ts + "_random_" + rand.nextLong()); 3285 put.addColumn(cf, qual, ts, value); 3286 } else if (rand.nextDouble() < 0.8) { 3287 del.addColumn(cf, qual, ts); 3288 } else { 3289 del.addColumns(cf, qual, ts); 3290 } 3291 } 3292 3293 if (!put.isEmpty()) { 3294 mutator.mutate(put); 3295 } 3296 3297 if (!del.isEmpty()) { 3298 mutator.mutate(del); 3299 } 3300 } 3301 LOG.info("Initiating flush #" + iFlush + " for table " + tableName); 3302 mutator.flush(); 3303 if (hbaseCluster != null) { 3304 getMiniHBaseCluster().flushcache(table.getName()); 3305 } 3306 } 3307 mutator.close(); 3308 3309 return table; 3310 } 3311 3312 public static int randomFreePort() { 3313 return HBaseCommonTestingUtil.randomFreePort(); 3314 } 3315 3316 public static String randomMultiCastAddress() { 3317 return "226.1.1." + ThreadLocalRandom.current().nextInt(254); 3318 } 3319 3320 public static void waitForHostPort(String host, int port) throws IOException { 3321 final int maxTimeMs = 10000; 3322 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; 3323 IOException savedException = null; 3324 LOG.info("Waiting for server at " + host + ":" + port); 3325 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { 3326 try { 3327 Socket sock = new Socket(InetAddress.getByName(host), port); 3328 sock.close(); 3329 savedException = null; 3330 LOG.info("Server at " + host + ":" + port + " is available"); 3331 break; 3332 } catch (UnknownHostException e) { 3333 throw new IOException("Failed to look up " + host, e); 3334 } catch (IOException e) { 3335 savedException = e; 3336 } 3337 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); 3338 } 3339 3340 if (savedException != null) { 3341 throw savedException; 3342 } 3343 } 3344 3345 public static int getMetaRSPort(Connection connection) throws IOException { 3346 try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) { 3347 return locator.getRegionLocation(Bytes.toBytes("")).getPort(); 3348 } 3349 } 3350 3351 /** 3352 * Due to async racing issue, a region may not be in the online region list of a region server 3353 * yet, after the assignment znode is deleted and the new assignment is recorded in master. 3354 */ 3355 public void assertRegionOnServer(final RegionInfo hri, final ServerName server, 3356 final long timeout) throws IOException, InterruptedException { 3357 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3358 while (true) { 3359 List<RegionInfo> regions = getAdmin().getRegions(server); 3360 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) return; 3361 long now = EnvironmentEdgeManager.currentTime(); 3362 if (now > timeoutTime) break; 3363 Thread.sleep(10); 3364 } 3365 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3366 } 3367 3368 /** 3369 * Check to make sure the region is open on the specified region server, but not on any other one. 3370 */ 3371 public void assertRegionOnlyOnServer(final RegionInfo hri, final ServerName server, 3372 final long timeout) throws IOException, InterruptedException { 3373 long timeoutTime = EnvironmentEdgeManager.currentTime() + timeout; 3374 while (true) { 3375 List<RegionInfo> regions = getAdmin().getRegions(server); 3376 if (regions.stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0)) { 3377 List<JVMClusterUtil.RegionServerThread> rsThreads = 3378 getHBaseCluster().getLiveRegionServerThreads(); 3379 for (JVMClusterUtil.RegionServerThread rsThread : rsThreads) { 3380 HRegionServer rs = rsThread.getRegionServer(); 3381 if (server.equals(rs.getServerName())) { 3382 continue; 3383 } 3384 Collection<HRegion> hrs = rs.getOnlineRegionsLocalContext(); 3385 for (HRegion r : hrs) { 3386 assertTrue("Region should not be double assigned", 3387 r.getRegionInfo().getRegionId() != hri.getRegionId()); 3388 } 3389 } 3390 return; // good, we are happy 3391 } 3392 long now = EnvironmentEdgeManager.currentTime(); 3393 if (now > timeoutTime) break; 3394 Thread.sleep(10); 3395 } 3396 fail("Could not find region " + hri.getRegionNameAsString() + " on server " + server); 3397 } 3398 3399 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException { 3400 TableDescriptor td = 3401 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3402 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3403 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td); 3404 } 3405 3406 public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd, 3407 BlockCache blockCache) throws IOException { 3408 TableDescriptor td = 3409 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build(); 3410 RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build(); 3411 return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache); 3412 } 3413 3414 public static void setFileSystemURI(String fsURI) { 3415 FS_URI = fsURI; 3416 } 3417 3418 /** 3419 * Returns a {@link Predicate} for checking that there are no regions in transition in master 3420 */ 3421 public ExplainingPredicate<IOException> predicateNoRegionsInTransition() { 3422 return new ExplainingPredicate<IOException>() { 3423 @Override 3424 public String explainFailure() throws IOException { 3425 final RegionStates regionStates = 3426 getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 3427 return "found in transition: " + regionStates.getRegionsInTransition().toString(); 3428 } 3429 3430 @Override 3431 public boolean evaluate() throws IOException { 3432 HMaster master = getMiniHBaseCluster().getMaster(); 3433 if (master == null) return false; 3434 AssignmentManager am = master.getAssignmentManager(); 3435 if (am == null) return false; 3436 return !am.hasRegionsInTransition(); 3437 } 3438 }; 3439 } 3440 3441 /** 3442 * Returns a {@link Predicate} for checking that table is enabled 3443 */ 3444 public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) { 3445 return new ExplainingPredicate<IOException>() { 3446 @Override 3447 public String explainFailure() throws IOException { 3448 return explainTableState(tableName, TableState.State.ENABLED); 3449 } 3450 3451 @Override 3452 public boolean evaluate() throws IOException { 3453 return getAdmin().tableExists(tableName) && getAdmin().isTableEnabled(tableName); 3454 } 3455 }; 3456 } 3457 3458 /** 3459 * Returns a {@link Predicate} for checking that table is enabled 3460 */ 3461 public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) { 3462 return new ExplainingPredicate<IOException>() { 3463 @Override 3464 public String explainFailure() throws IOException { 3465 return explainTableState(tableName, TableState.State.DISABLED); 3466 } 3467 3468 @Override 3469 public boolean evaluate() throws IOException { 3470 return getAdmin().isTableDisabled(tableName); 3471 } 3472 }; 3473 } 3474 3475 /** 3476 * Returns a {@link Predicate} for checking that table is enabled 3477 */ 3478 public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) { 3479 return new ExplainingPredicate<IOException>() { 3480 @Override 3481 public String explainFailure() throws IOException { 3482 return explainTableAvailability(tableName); 3483 } 3484 3485 @Override 3486 public boolean evaluate() throws IOException { 3487 boolean tableAvailable = getAdmin().isTableAvailable(tableName); 3488 if (tableAvailable) { 3489 try (Table table = getConnection().getTable(tableName)) { 3490 TableDescriptor htd = table.getDescriptor(); 3491 for (HRegionLocation loc : getConnection().getRegionLocator(tableName) 3492 .getAllRegionLocations()) { 3493 Scan scan = new Scan().withStartRow(loc.getRegion().getStartKey()) 3494 .withStopRow(loc.getRegion().getEndKey()).setOneRowLimit() 3495 .setMaxResultsPerColumnFamily(1).setCacheBlocks(false); 3496 for (byte[] family : htd.getColumnFamilyNames()) { 3497 scan.addFamily(family); 3498 } 3499 try (ResultScanner scanner = table.getScanner(scan)) { 3500 scanner.next(); 3501 } 3502 } 3503 } 3504 } 3505 return tableAvailable; 3506 } 3507 }; 3508 } 3509 3510 /** 3511 * Wait until no regions in transition. 3512 * @param timeout How long to wait. 3513 */ 3514 public void waitUntilNoRegionsInTransition(final long timeout) throws IOException { 3515 waitFor(timeout, predicateNoRegionsInTransition()); 3516 } 3517 3518 /** 3519 * Wait until no regions in transition. (time limit 15min) 3520 */ 3521 public void waitUntilNoRegionsInTransition() throws IOException { 3522 waitUntilNoRegionsInTransition(15 * 60000); 3523 } 3524 3525 /** 3526 * Wait until labels is ready in VisibilityLabelsCache. 3527 */ 3528 public void waitLabelAvailable(long timeoutMillis, final String... labels) { 3529 final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get(); 3530 waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() { 3531 3532 @Override 3533 public boolean evaluate() { 3534 for (String label : labels) { 3535 if (labelsCache.getLabelOrdinal(label) == 0) { 3536 return false; 3537 } 3538 } 3539 return true; 3540 } 3541 3542 @Override 3543 public String explainFailure() { 3544 for (String label : labels) { 3545 if (labelsCache.getLabelOrdinal(label) == 0) { 3546 return label + " is not available yet"; 3547 } 3548 } 3549 return ""; 3550 } 3551 }); 3552 } 3553 3554 /** 3555 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3556 * available. 3557 * @return the list of column descriptors 3558 */ 3559 public static List<ColumnFamilyDescriptor> generateColumnDescriptors() { 3560 return generateColumnDescriptors(""); 3561 } 3562 3563 /** 3564 * Create a set of column descriptors with the combination of compression, encoding, bloom codecs 3565 * available. 3566 * @param prefix family names prefix 3567 * @return the list of column descriptors 3568 */ 3569 public static List<ColumnFamilyDescriptor> generateColumnDescriptors(final String prefix) { 3570 List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); 3571 long familyId = 0; 3572 for (Compression.Algorithm compressionType : getSupportedCompressionAlgorithms()) { 3573 for (DataBlockEncoding encodingType : DataBlockEncoding.values()) { 3574 for (BloomType bloomType : BloomType.values()) { 3575 String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId); 3576 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 3577 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(name)); 3578 columnFamilyDescriptorBuilder.setCompressionType(compressionType); 3579 columnFamilyDescriptorBuilder.setDataBlockEncoding(encodingType); 3580 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 3581 columnFamilyDescriptors.add(columnFamilyDescriptorBuilder.build()); 3582 familyId++; 3583 } 3584 } 3585 } 3586 return columnFamilyDescriptors; 3587 } 3588 3589 /** 3590 * Get supported compression algorithms. 3591 * @return supported compression algorithms. 3592 */ 3593 public static Compression.Algorithm[] getSupportedCompressionAlgorithms() { 3594 String[] allAlgos = HFile.getSupportedCompressionAlgorithms(); 3595 List<Compression.Algorithm> supportedAlgos = new ArrayList<>(); 3596 for (String algoName : allAlgos) { 3597 try { 3598 Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName); 3599 algo.getCompressor(); 3600 supportedAlgos.add(algo); 3601 } catch (Throwable t) { 3602 // this algo is not available 3603 } 3604 } 3605 return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]); 3606 } 3607 3608 public Result getClosestRowBefore(Region r, byte[] row, byte[] family) throws IOException { 3609 Scan scan = new Scan().withStartRow(row); 3610 scan.setReadType(ReadType.PREAD); 3611 scan.setCaching(1); 3612 scan.setReversed(true); 3613 scan.addFamily(family); 3614 try (RegionScanner scanner = r.getScanner(scan)) { 3615 List<Cell> cells = new ArrayList<>(1); 3616 scanner.next(cells); 3617 if (r.getRegionInfo().isMetaRegion() && !isTargetTable(row, cells.get(0))) { 3618 return null; 3619 } 3620 return Result.create(cells); 3621 } 3622 } 3623 3624 private boolean isTargetTable(final byte[] inRow, Cell c) { 3625 String inputRowString = Bytes.toString(inRow); 3626 int i = inputRowString.indexOf(HConstants.DELIMITER); 3627 String outputRowString = Bytes.toString(c.getRowArray(), c.getRowOffset(), c.getRowLength()); 3628 int o = outputRowString.indexOf(HConstants.DELIMITER); 3629 return inputRowString.substring(0, i).equals(outputRowString.substring(0, o)); 3630 } 3631 3632 /** 3633 * Sets up {@link MiniKdc} for testing security. Uses {@link HBaseKerberosUtils} to set the given 3634 * keytab file as {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}. FYI, there is also the easier-to-use 3635 * kerby KDC server and utility for using it, 3636 * {@link org.apache.hadoop.hbase.util.SimpleKdcServerUtil}. The kerby KDC server is preferred; 3637 * less baggage. It came in in HBASE-5291. 3638 */ 3639 public MiniKdc setupMiniKdc(File keytabFile) throws Exception { 3640 Properties conf = MiniKdc.createConf(); 3641 conf.put(MiniKdc.DEBUG, true); 3642 MiniKdc kdc = null; 3643 File dir = null; 3644 // There is time lag between selecting a port and trying to bind with it. It's possible that 3645 // another service captures the port in between which'll result in BindException. 3646 boolean bindException; 3647 int numTries = 0; 3648 do { 3649 try { 3650 bindException = false; 3651 dir = new File(getDataTestDir("kdc").toUri().getPath()); 3652 kdc = new MiniKdc(conf, dir); 3653 kdc.start(); 3654 } catch (BindException e) { 3655 FileUtils.deleteDirectory(dir); // clean directory 3656 numTries++; 3657 if (numTries == 3) { 3658 LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times."); 3659 throw e; 3660 } 3661 LOG.error("BindException encountered when setting up MiniKdc. Trying again."); 3662 bindException = true; 3663 } 3664 } while (bindException); 3665 HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); 3666 return kdc; 3667 } 3668 3669 public int getNumHFiles(final TableName tableName, final byte[] family) { 3670 int numHFiles = 0; 3671 for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { 3672 numHFiles += getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, family); 3673 } 3674 return numHFiles; 3675 } 3676 3677 public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, 3678 final byte[] family) { 3679 int numHFiles = 0; 3680 for (Region region : rs.getRegions(tableName)) { 3681 numHFiles += region.getStore(family).getStorefilesCount(); 3682 } 3683 return numHFiles; 3684 } 3685 3686 public void verifyTableDescriptorIgnoreTableName(TableDescriptor ltd, TableDescriptor rtd) { 3687 assertEquals(ltd.getValues().hashCode(), rtd.getValues().hashCode()); 3688 Collection<ColumnFamilyDescriptor> ltdFamilies = Arrays.asList(ltd.getColumnFamilies()); 3689 Collection<ColumnFamilyDescriptor> rtdFamilies = Arrays.asList(rtd.getColumnFamilies()); 3690 assertEquals(ltdFamilies.size(), rtdFamilies.size()); 3691 for (Iterator<ColumnFamilyDescriptor> it = ltdFamilies.iterator(), 3692 it2 = rtdFamilies.iterator(); it.hasNext();) { 3693 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); 3694 } 3695 } 3696 3697 /** 3698 * Await the successful return of {@code condition}, sleeping {@code sleepMillis} between 3699 * invocations. 3700 */ 3701 public static void await(final long sleepMillis, final BooleanSupplier condition) 3702 throws InterruptedException { 3703 try { 3704 while (!condition.getAsBoolean()) { 3705 Thread.sleep(sleepMillis); 3706 } 3707 } catch (RuntimeException e) { 3708 if (e.getCause() instanceof AssertionError) { 3709 throw (AssertionError) e.getCause(); 3710 } 3711 throw e; 3712 } 3713 } 3714}