001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; 021import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Iterator; 030import java.util.List; 031import java.util.NavigableSet; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.Future; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.LongAdder; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FSDataOutputStream; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 047import org.apache.hadoop.hbase.SplitLogCounters; 048import org.apache.hadoop.hbase.StartTestingClusterOption; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.Waiter; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.RegionLocator; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; 057import org.apache.hadoop.hbase.master.assignment.RegionStates; 058import org.apache.hadoop.hbase.regionserver.HRegionServer; 059import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 060import org.apache.hadoop.hbase.regionserver.Region; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 064import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 065import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 066import org.apache.hadoop.hbase.util.Threads; 067import org.apache.hadoop.hbase.wal.WAL; 068import org.apache.hadoop.hbase.wal.WALEdit; 069import org.apache.hadoop.hbase.wal.WALEditInternalHelper; 070import org.apache.hadoop.hbase.wal.WALKeyImpl; 071import org.apache.hadoop.hbase.zookeeper.ZKUtil; 072import org.junit.After; 073import org.junit.AfterClass; 074import org.junit.Before; 075import org.junit.BeforeClass; 076import org.junit.Rule; 077import org.junit.Test; 078import org.junit.rules.TestName; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 083 084/** 085 * Base class for testing distributed log splitting. 086 */ 087public abstract class AbstractTestDLS { 088 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); 089 090 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 091 092 // Start a cluster with 2 masters and 5 regionservers 093 private static final int NUM_MASTERS = 2; 094 private static final int NUM_RS = 5; 095 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); 096 097 @Rule 098 public TestName testName = new TestName(); 099 100 private TableName tableName; 101 private SingleProcessHBaseCluster cluster; 102 private HMaster master; 103 private Configuration conf; 104 105 @Rule 106 public TestName name = new TestName(); 107 108 @BeforeClass 109 public static void setup() throws Exception { 110 // Uncomment the following line if more verbosity is needed for 111 // debugging (see HBASE-12285 for details). 112 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); 113 TEST_UTIL.startMiniZKCluster(); 114 TEST_UTIL.startMiniDFSCluster(3); 115 } 116 117 @AfterClass 118 public static void tearDown() throws Exception { 119 TEST_UTIL.shutdownMiniCluster(); 120 } 121 122 protected abstract String getWalProvider(); 123 124 private void startCluster(int numRS) throws Exception { 125 SplitLogCounters.resetCounters(); 126 LOG.info("Starting cluster"); 127 conf.setLong("hbase.splitlog.max.resubmit", 0); 128 // Make the failure test faster 129 conf.setInt("zookeeper.recovery.retry", 0); 130 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); 131 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing 132 conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); 133 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 134 conf.set("hbase.wal.provider", getWalProvider()); 135 StartTestingClusterOption option = 136 StartTestingClusterOption.builder().numMasters(NUM_MASTERS).numRegionServers(numRS).build(); 137 TEST_UTIL.startMiniHBaseCluster(option); 138 cluster = TEST_UTIL.getHBaseCluster(); 139 LOG.info("Waiting for active/ready master"); 140 cluster.waitForActiveAndReadyMaster(); 141 master = cluster.getMaster(); 142 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 143 @Override 144 public boolean evaluate() throws Exception { 145 return cluster.getLiveRegionServerThreads().size() >= numRS; 146 } 147 }); 148 } 149 150 @Before 151 public void before() throws Exception { 152 conf = TEST_UTIL.getConfiguration(); 153 tableName = TableName.valueOf(testName.getMethodName()); 154 } 155 156 @After 157 public void after() throws Exception { 158 TEST_UTIL.shutdownMiniHBaseCluster(); 159 TEST_UTIL.getTestFileSystem().delete(CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration()), 160 true); 161 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); 162 } 163 164 @Test 165 public void testMasterStartsUpWithLogSplittingWork() throws Exception { 166 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); 167 startCluster(NUM_RS); 168 169 int numRegionsToCreate = 40; 170 int numLogLines = 1000; 171 // turn off load balancing to prevent regions from moving around otherwise 172 // they will consume recovered.edits 173 master.balanceSwitch(false); 174 175 try (Table ht = installTable(numRegionsToCreate)) { 176 HRegionServer hrs = findRSToKill(false); 177 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 178 makeWAL(hrs, regions, numLogLines, 100); 179 180 // abort master 181 abortMaster(cluster); 182 183 // abort RS 184 LOG.info("Aborting region server: " + hrs.getServerName()); 185 hrs.abort("testing"); 186 187 // wait for abort completes 188 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 189 @Override 190 public boolean evaluate() throws Exception { 191 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; 192 } 193 }); 194 195 Thread.sleep(2000); 196 LOG.info("Current Open Regions:" + HBaseTestingUtil.getAllOnlineRegions(cluster).size()); 197 198 // wait for abort completes 199 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { 200 @Override 201 public boolean evaluate() throws Exception { 202 return (HBaseTestingUtil.getAllOnlineRegions(cluster).size() >= (numRegionsToCreate + 1)); 203 } 204 }); 205 206 LOG.info("Current Open Regions After Master Node Starts Up:" 207 + HBaseTestingUtil.getAllOnlineRegions(cluster).size()); 208 209 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); 210 } 211 } 212 213 @Test 214 public void testThreeRSAbort() throws Exception { 215 LOG.info("testThreeRSAbort"); 216 int numRegionsToCreate = 40; 217 int numRowsPerRegion = 100; 218 219 startCluster(NUM_RS); // NUM_RS=6. 220 221 try (Table table = installTable(numRegionsToCreate)) { 222 populateDataInTable(numRowsPerRegion); 223 224 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 225 assertEquals(NUM_RS, rsts.size()); 226 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); 227 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); 228 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); 229 230 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { 231 232 @Override 233 public boolean evaluate() throws Exception { 234 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; 235 } 236 237 @Override 238 public String explainFailure() throws Exception { 239 return "Timed out waiting for server aborts."; 240 } 241 }); 242 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 243 int rows; 244 try { 245 rows = TEST_UTIL.countRows(table); 246 } catch (Exception e) { 247 Threads.printThreadInfo(System.out, "Thread dump before fail"); 248 throw e; 249 } 250 assertEquals(numRegionsToCreate * numRowsPerRegion, rows); 251 } 252 } 253 254 @Test 255 public void testDelayedDeleteOnFailure() throws Exception { 256 if ( 257 !this.conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, 258 HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK) 259 ) { 260 // This test depends on zk coordination.... 261 return; 262 } 263 LOG.info("testDelayedDeleteOnFailure"); 264 startCluster(1); 265 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); 266 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 267 final Path rootLogDir = 268 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME); 269 final Path logDir = new Path(rootLogDir, ServerName.valueOf("x", 1, 1).toString()); 270 fs.mkdirs(logDir); 271 ExecutorService executor = null; 272 try { 273 final Path corruptedLogFile = new Path(logDir, "x"); 274 FSDataOutputStream out; 275 out = fs.create(corruptedLogFile); 276 out.write(0); 277 out.write(Bytes.toBytes("corrupted bytes")); 278 out.close(); 279 ZKSplitLogManagerCoordination coordination = 280 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) 281 .getSplitLogManagerCoordination(); 282 coordination.setIgnoreDeleteForTesting(true); 283 executor = Executors.newSingleThreadExecutor(); 284 Runnable runnable = new Runnable() { 285 @Override 286 public void run() { 287 try { 288 // since the logDir is a fake, corrupted one, so the split log worker 289 // will finish it quickly with error, and this call will fail and throw 290 // an IOException. 291 slm.splitLogDistributed(logDir); 292 } catch (IOException ioe) { 293 try { 294 assertTrue(fs.exists(corruptedLogFile)); 295 // this call will block waiting for the task to be removed from the 296 // tasks map which is not going to happen since ignoreZKDeleteForTesting 297 // is set to true, until it is interrupted. 298 slm.splitLogDistributed(logDir); 299 } catch (IOException e) { 300 assertTrue(Thread.currentThread().isInterrupted()); 301 return; 302 } 303 fail("did not get the expected IOException from the 2nd call"); 304 } 305 fail("did not get the expected IOException from the 1st call"); 306 } 307 }; 308 Future<?> result = executor.submit(runnable); 309 try { 310 result.get(2000, TimeUnit.MILLISECONDS); 311 } catch (TimeoutException te) { 312 // it is ok, expected. 313 } 314 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); 315 executor.shutdownNow(); 316 executor = null; 317 318 // make sure the runnable is finished with no exception thrown. 319 result.get(); 320 } finally { 321 if (executor != null) { 322 // interrupt the thread in case the test fails in the middle. 323 // it has no effect if the thread is already terminated. 324 executor.shutdownNow(); 325 } 326 fs.delete(logDir, true); 327 } 328 } 329 330 private Table installTable(int nrs) throws Exception { 331 return installTable(nrs, 0); 332 } 333 334 private Table installTable(int nrs, int existingRegions) throws Exception { 335 // Create a table with regions 336 byte[] family = Bytes.toBytes("family"); 337 LOG.info("Creating table with " + nrs + " regions"); 338 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); 339 int numRegions = -1; 340 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 341 numRegions = r.getStartKeys().length; 342 } 343 assertEquals(nrs, numRegions); 344 LOG.info("Waiting for no more RIT\n"); 345 blockUntilNoRIT(); 346 // disable-enable cycle to get rid of table's dead regions left behind 347 // by createMultiRegions 348 assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); 349 LOG.debug("Disabling table\n"); 350 TEST_UTIL.getAdmin().disableTable(tableName); 351 LOG.debug("Waiting for no more RIT\n"); 352 blockUntilNoRIT(); 353 NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 354 LOG.debug("Verifying only catalog region is assigned\n"); 355 if (regions.size() != 1) { 356 for (String oregion : regions) { 357 LOG.debug("Region still online: " + oregion); 358 } 359 } 360 assertEquals(1 + existingRegions, regions.size()); 361 LOG.debug("Enabling table\n"); 362 TEST_UTIL.getAdmin().enableTable(tableName); 363 LOG.debug("Waiting for no more RIT\n"); 364 blockUntilNoRIT(); 365 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); 366 regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 367 assertEquals(numRegions + 1 + existingRegions, regions.size()); 368 return table; 369 } 370 371 void populateDataInTable(int nrows) throws Exception { 372 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 373 assertEquals(NUM_RS, rsts.size()); 374 375 for (RegionServerThread rst : rsts) { 376 HRegionServer hrs = rst.getRegionServer(); 377 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 378 for (RegionInfo hri : hris) { 379 if (hri.getTable().isSystemTable()) { 380 continue; 381 } 382 LOG.debug( 383 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); 384 Region region = hrs.getOnlineRegion(hri.getRegionName()); 385 assertTrue(region != null); 386 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); 387 } 388 } 389 } 390 391 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) 392 throws IOException { 393 makeWAL(hrs, regions, num_edits, edit_size, true); 394 } 395 396 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, 397 boolean cleanShutdown) throws IOException { 398 // remove root and meta region 399 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); 400 401 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { 402 RegionInfo regionInfo = iter.next(); 403 if (regionInfo.getTable().isSystemTable()) { 404 iter.remove(); 405 } 406 } 407 byte[] value = new byte[editSize]; 408 409 List<RegionInfo> hris = new ArrayList<>(); 410 for (RegionInfo region : regions) { 411 if (region.getTable() != tableName) { 412 continue; 413 } 414 hris.add(region); 415 } 416 LOG.info("Creating wal edits across " + hris.size() + " regions."); 417 for (int i = 0; i < editSize; i++) { 418 value[i] = (byte) ('a' + (i % 26)); 419 } 420 int n = hris.size(); 421 int[] counts = new int[n]; 422 // sync every ~30k to line up with desired wal rolls 423 final int syncEvery = 30 * 1024 / editSize; 424 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 425 if (n > 0) { 426 for (int i = 0; i < numEdits; i += 1) { 427 WALEdit e = new WALEdit(); 428 RegionInfo curRegionInfo = hris.get(i % n); 429 WAL log = hrs.getWAL(curRegionInfo); 430 byte[] startRow = curRegionInfo.getStartKey(); 431 if (startRow == null || startRow.length == 0) { 432 startRow = new byte[] { 0, 0, 0, 0, 1 }; 433 } 434 byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); 435 row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because 436 // HBaseTestingUtility.createMultiRegions use 5 bytes key 437 byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); 438 WALEditInternalHelper.addExtendedCell(e, 439 new KeyValue(row, COLUMN_FAMILY, qualifier, EnvironmentEdgeManager.currentTime(), value)); 440 log.appendData(curRegionInfo, new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), 441 tableName, EnvironmentEdgeManager.currentTime(), mvcc), e); 442 if (0 == i % syncEvery) { 443 log.sync(); 444 } 445 counts[i % n] += 1; 446 } 447 } 448 // done as two passes because the regions might share logs. shutdown is idempotent, but sync 449 // will cause errors if done after. 450 for (RegionInfo info : hris) { 451 WAL log = hrs.getWAL(info); 452 log.sync(); 453 } 454 if (cleanShutdown) { 455 for (RegionInfo info : hris) { 456 WAL log = hrs.getWAL(info); 457 log.shutdown(); 458 } 459 } 460 for (int i = 0; i < n; i++) { 461 LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); 462 } 463 return; 464 } 465 466 private void blockUntilNoRIT() throws Exception { 467 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 468 } 469 470 private void putData(Region region, byte[] startRow, int numRows, byte[] qf, byte[]... families) 471 throws IOException { 472 for (int i = 0; i < numRows; i++) { 473 Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); 474 for (byte[] family : families) { 475 put.addColumn(family, qf, null); 476 } 477 region.put(put); 478 } 479 } 480 481 private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems) 482 throws InterruptedException { 483 long curt = EnvironmentEdgeManager.currentTime(); 484 long endt = curt + timems; 485 while (curt < endt) { 486 if (ctr.sum() == oldval) { 487 Thread.sleep(100); 488 curt = EnvironmentEdgeManager.currentTime(); 489 } else { 490 assertEquals(newval, ctr.sum()); 491 return; 492 } 493 } 494 fail(); 495 } 496 497 private void abortMaster(SingleProcessHBaseCluster cluster) throws InterruptedException { 498 for (MasterThread mt : cluster.getLiveMasterThreads()) { 499 if (mt.getMaster().isActiveMaster()) { 500 mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); 501 mt.join(); 502 break; 503 } 504 } 505 LOG.debug("Master is aborted"); 506 } 507 508 /** 509 * Find a RS that has regions of a table. 510 * @param hasMetaRegion when true, the returned RS has hbase:meta region as well 511 */ 512 private HRegionServer findRSToKill(boolean hasMetaRegion) throws Exception { 513 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); 514 List<RegionInfo> regions = null; 515 HRegionServer hrs = null; 516 517 for (RegionServerThread rst : rsts) { 518 hrs = rst.getRegionServer(); 519 while (rst.isAlive() && !hrs.isOnline()) { 520 Thread.sleep(100); 521 } 522 if (!rst.isAlive()) { 523 continue; 524 } 525 boolean isCarryingMeta = false; 526 boolean foundTableRegion = false; 527 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); 528 for (RegionInfo region : regions) { 529 if (region.isMetaRegion()) { 530 isCarryingMeta = true; 531 } 532 if (region.getTable() == tableName) { 533 foundTableRegion = true; 534 } 535 if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { 536 break; 537 } 538 } 539 if (isCarryingMeta && hasMetaRegion) { 540 // clients ask for a RS with META 541 if (!foundTableRegion) { 542 HRegionServer destRS = hrs; 543 // the RS doesn't have regions of the specified table so we need move one to this RS 544 List<RegionInfo> tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); 545 RegionInfo hri = tableRegions.get(0); 546 TEST_UTIL.getAdmin().move(hri.getEncodedNameAsBytes(), destRS.getServerName()); 547 // wait for region move completes 548 RegionStates regionStates = 549 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); 550 TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() { 551 @Override 552 public boolean evaluate() throws Exception { 553 ServerName sn = regionStates.getRegionServerOfRegion(hri); 554 return (sn != null && sn.equals(destRS.getServerName())); 555 } 556 }); 557 } 558 return hrs; 559 } else if (hasMetaRegion || isCarryingMeta) { 560 continue; 561 } 562 if (foundTableRegion) { 563 break; 564 } 565 } 566 567 return hrs; 568 } 569}