001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.CatalogFamilyFormat; 034import org.apache.hadoop.hbase.ClientMetaTableAccessor; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.MetaTableAccessor; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.BufferedMutator; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.client.TableState; 054import org.apache.hadoop.hbase.master.HMaster; 055import org.apache.hadoop.hbase.master.RegionState; 056import org.apache.hadoop.hbase.master.TableStateManager; 057import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 058import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 059import org.apache.hadoop.hbase.procedure2.Procedure; 060import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 061import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 062import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 063import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.FSUtils; 068import org.apache.hadoop.hbase.util.MD5Hash; 069import org.apache.hadoop.hbase.util.ModifyRegionUtils; 070import org.apache.yetus.audience.InterfaceAudience; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074@InterfaceAudience.Private 075public class MasterProcedureTestingUtility { 076 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 077 078 private MasterProcedureTestingUtility() { 079 } 080 081 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 082 throws Exception { 083 final MasterProcedureEnv env = procExec.getEnvironment(); 084 final HMaster master = (HMaster) env.getMasterServices(); 085 ProcedureTestingUtility.restart(procExec, true, true, 086 // stop services 087 new Callable<Void>() { 088 @Override 089 public Void call() throws Exception { 090 master.setServiceStarted(false); 091 AssignmentManager am = env.getAssignmentManager(); 092 // try to simulate a master restart by removing the ServerManager states about seqIDs 093 for (RegionState regionState : am.getRegionStates().getRegionStates()) { 094 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 095 } 096 am.stop(); 097 master.setInitialized(false); 098 return null; 099 } 100 }, 101 // setup RIT before starting workers 102 new Callable<Void>() { 103 104 @Override 105 public Void call() throws Exception { 106 AssignmentManager am = env.getAssignmentManager(); 107 am.start(); 108 // just follow the same way with HMaster.finishActiveMasterInitialization. See the 109 // comments there 110 am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess()) 111 .filter(p -> p instanceof TransitRegionStateProcedure) 112 .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList())); 113 // create server state node, to simulate master start up 114 env.getMasterServices().getServerManager().getOnlineServersList() 115 .forEach(am.getRegionStates()::createServer); 116 master.setServiceStarted(true); 117 return null; 118 } 119 }, 120 // restart services 121 new Callable<Void>() { 122 @Override 123 public Void call() throws Exception { 124 AssignmentManager am = env.getAssignmentManager(); 125 try { 126 am.joinCluster(); 127 am.wakeMetaLoadedEvent(); 128 master.setInitialized(true); 129 } catch (Exception e) { 130 LOG.warn("Failed to load meta", e); 131 } 132 return null; 133 } 134 }); 135 } 136 137 // ========================================================================== 138 // Master failover utils 139 // ========================================================================== 140 public static void masterFailover(final HBaseTestingUtil testUtil) throws Exception { 141 SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 142 143 // Kill the master 144 HMaster oldMaster = cluster.getMaster(); 145 cluster.killMaster(cluster.getMaster().getServerName()); 146 147 // Wait the secondary 148 waitBackupMaster(testUtil, oldMaster); 149 } 150 151 public static void waitBackupMaster(final HBaseTestingUtil testUtil, final HMaster oldMaster) 152 throws Exception { 153 SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 154 155 HMaster newMaster = cluster.getMaster(); 156 while (newMaster == null || newMaster == oldMaster) { 157 Thread.sleep(250); 158 newMaster = cluster.getMaster(); 159 } 160 161 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 162 Thread.sleep(250); 163 } 164 } 165 166 // ========================================================================== 167 // Table Helpers 168 // ========================================================================== 169 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 170 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 171 for (int i = 0; i < family.length; ++i) { 172 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 173 } 174 return builder.build(); 175 } 176 177 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 178 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 179 TableDescriptor htd = createHTD(tableName, family); 180 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 181 long procId = ProcedureTestingUtility.submitAndWait(procExec, 182 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 183 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 184 return regions; 185 } 186 187 public static void validateTableCreation(final HMaster master, final TableName tableName, 188 final RegionInfo[] regions, String... family) throws IOException { 189 validateTableCreation(master, tableName, regions, true, family); 190 } 191 192 public static void validateTableCreation(final HMaster master, final TableName tableName, 193 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 194 // check filesystem 195 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 196 final Path tableDir = 197 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 198 assertTrue(fs.exists(tableDir)); 199 CommonFSUtils.logFileSystemState(fs, tableDir, LOG); 200 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 201 for (int i = 0; i < regions.length; ++i) { 202 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 203 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 204 assertTrue(unwantedRegionDirs.remove(regionDir)); 205 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 206 for (int j = 0; j < family.length; ++j) { 207 final Path familyDir = new Path(regionDir, family[j]); 208 if (hasFamilyDirs) { 209 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 210 assertTrue(allFamilyDirs.remove(familyDir)); 211 } else { 212 // TODO: WARN: Modify Table/Families does not create a family dir 213 if (!fs.exists(familyDir)) { 214 LOG.warn(family[j] + " family dir does not exist"); 215 } 216 allFamilyDirs.remove(familyDir); 217 } 218 } 219 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 220 } 221 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 222 LOG.debug("Table directory layout is as expected."); 223 224 // check meta 225 assertTrue(tableExists(master.getConnection(), tableName)); 226 assertEquals(regions.length, countMetaRegions(master, tableName)); 227 228 // check htd 229 TableDescriptor htd = master.getTableDescriptors().get(tableName); 230 assertTrue("table descriptor not found", htd != null); 231 for (int i = 0; i < family.length; ++i) { 232 assertTrue("family not found " + family[i], 233 htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 234 } 235 assertEquals(family.length, htd.getColumnFamilyCount()); 236 237 // checks store file tracker impl has been properly set in htd 238 String storeFileTrackerImpl = 239 StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration()); 240 assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL)); 241 } 242 243 public static void validateTableDeletion(final HMaster master, final TableName tableName) 244 throws IOException { 245 // check filesystem 246 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 247 final Path tableDir = 248 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 249 assertFalse(fs.exists(tableDir)); 250 251 // check meta 252 assertFalse(tableExists(master.getConnection(), tableName)); 253 assertEquals(0, countMetaRegions(master, tableName)); 254 255 // check htd 256 assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null); 257 } 258 259 private static int countMetaRegions(final HMaster master, final TableName tableName) 260 throws IOException { 261 final AtomicInteger actualRegCount = new AtomicInteger(0); 262 final ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 263 @Override 264 public boolean visit(Result rowResult) throws IOException { 265 RegionLocations list = CatalogFamilyFormat.getRegionLocations(rowResult); 266 if (list == null) { 267 LOG.warn("No serialized RegionInfo in " + rowResult); 268 return true; 269 } 270 HRegionLocation l = list.getRegionLocation(); 271 if (l == null) { 272 return true; 273 } 274 if (!l.getRegion().getTable().equals(tableName)) { 275 return false; 276 } 277 if (l.getRegion().isOffline() || l.getRegion().isSplit()) { 278 return true; 279 } 280 281 HRegionLocation[] locations = list.getRegionLocations(); 282 for (HRegionLocation location : locations) { 283 if (location == null) continue; 284 ServerName serverName = location.getServerName(); 285 // Make sure that regions are assigned to server 286 if (serverName != null && serverName.getAddress() != null) { 287 actualRegCount.incrementAndGet(); 288 } 289 } 290 return true; 291 } 292 }; 293 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 294 return actualRegCount.get(); 295 } 296 297 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 298 throws IOException { 299 TableStateManager tsm = master.getTableStateManager(); 300 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 301 } 302 303 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 304 throws IOException { 305 TableStateManager tsm = master.getTableStateManager(); 306 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 307 } 308 309 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 310 final String family) throws IOException { 311 TableDescriptor htd = master.getTableDescriptors().get(tableName); 312 assertTrue(htd != null); 313 314 assertTrue(htd.hasColumnFamily(Bytes.toBytes(family))); 315 } 316 317 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 318 final String family) throws IOException { 319 // verify htd 320 TableDescriptor htd = master.getTableDescriptors().get(tableName); 321 assertTrue(htd != null); 322 assertFalse(htd.hasColumnFamily(Bytes.toBytes(family))); 323 324 // verify fs 325 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 326 final Path tableDir = 327 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 328 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 329 final Path familyDir = new Path(regionDir, family); 330 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 331 } 332 } 333 334 public static void validateColumnFamilyModification(final HMaster master, 335 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 336 throws IOException { 337 TableDescriptor htd = master.getTableDescriptors().get(tableName); 338 assertTrue(htd != null); 339 340 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(Bytes.toBytes(family)); 341 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 342 } 343 344 public static void loadData(final Connection connection, final TableName tableName, int rows, 345 final byte[][] splitKeys, final String... sfamilies) throws IOException { 346 byte[][] families = new byte[sfamilies.length][]; 347 for (int i = 0; i < families.length; ++i) { 348 families[i] = Bytes.toBytes(sfamilies[i]); 349 } 350 351 BufferedMutator mutator = connection.getBufferedMutator(tableName); 352 353 // Ensure one row per region 354 assertTrue(rows >= splitKeys.length); 355 for (byte[] k : splitKeys) { 356 byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k); 357 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 358 mutator.mutate(createPut(families, key, value)); 359 rows--; 360 } 361 362 // Add other extra rows. more rows, more files 363 while (rows-- > 0) { 364 byte[] value = 365 Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows)); 366 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 367 mutator.mutate(createPut(families, key, value)); 368 } 369 mutator.flush(); 370 } 371 372 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 373 byte[] q = Bytes.toBytes("q"); 374 Put put = new Put(key); 375 put.setDurability(Durability.SKIP_WAL); 376 for (byte[] family : families) { 377 put.addColumn(family, q, value); 378 } 379 return put; 380 } 381 382 // ========================================================================== 383 // Procedure Helpers 384 // ========================================================================== 385 public static long generateNonceGroup(final HMaster master) { 386 return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup(); 387 } 388 389 public static long generateNonce(final HMaster master) { 390 return master.getAsyncClusterConnection().getNonceGenerator().newNonce(); 391 } 392 393 /** 394 * Run through all procedure flow states TWICE while also restarting procedure executor at each 395 * step; i.e force a reread of procedure store. 396 * <p> 397 * It does 398 * <ol> 399 * <li>Execute step N - kill the executor before store update 400 * <li>Restart executor/store 401 * <li>Execute step N - and then save to store 402 * </ol> 403 * <p> 404 * This is a good test for finding state that needs persisting and steps that are not idempotent. 405 * Use this version of the test when a procedure executes all flow steps from start to finish. 406 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 407 */ 408 public static void testRecoveryAndDoubleExecution( 409 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 410 final boolean expectExecRunning) throws Exception { 411 ProcedureTestingUtility.waitProcedure(procExec, procId); 412 assertEquals(false, procExec.isRunning()); 413 414 // Restart the executor and execute the step twice 415 // execute step N - kill before store update 416 // restart executor/store 417 // execute step N - save on store 418 // NOTE: currently we make assumption that states/ steps are sequential. There are already 419 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 420 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 421 // that happens the states can not be treated as sequential steps and the condition in 422 // following while loop needs to be changed. We can use euqals/ not equals operator to check 423 // if the procedure has reached the user specified state. But there is a possibility that 424 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 425 // fix would be get all visited states by the procedure and then check if user speccified 426 // state is in that list. Current assumption of sequential proregression of steps/ states is 427 // made at multiple places so we can keep while condition below for simplicity. 428 Procedure<?> proc = procExec.getProcedure(procId); 429 int stepNum = proc instanceof StateMachineProcedure 430 ? ((StateMachineProcedure) proc).getCurrentStateId() 431 : 0; 432 for (;;) { 433 if (stepNum == lastStep) { 434 break; 435 } 436 LOG.info("Restart " + stepNum + " exec state=" + proc); 437 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 438 restartMasterProcedureExecutor(procExec); 439 ProcedureTestingUtility.waitProcedure(procExec, procId); 440 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 441 proc = procExec.getProcedure(procId); 442 stepNum = proc instanceof StateMachineProcedure 443 ? ((StateMachineProcedure) proc).getCurrentStateId() 444 : stepNum + 1; 445 } 446 447 assertEquals(expectExecRunning, procExec.isRunning()); 448 } 449 450 /** 451 * Run through all procedure flow states TWICE while also restarting procedure executor at each 452 * step; i.e force a reread of procedure store. 453 * <p> 454 * It does 455 * <ol> 456 * <li>Execute step N - kill the executor before store update 457 * <li>Restart executor/store 458 * <li>Executes hook for each step twice 459 * <li>Execute step N - and then save to store 460 * </ol> 461 * <p> 462 * This is a good test for finding state that needs persisting and steps that are not idempotent. 463 * Use this version of the test when the order in which flow steps are executed is not start to 464 * finish; where the procedure may vary the flow steps dependent on circumstance found. 465 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 466 */ 467 public static void testRecoveryAndDoubleExecution( 468 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 469 throws Exception { 470 ProcedureTestingUtility.waitProcedure(procExec, procId); 471 assertEquals(false, procExec.isRunning()); 472 for (int i = 0; !procExec.isFinished(procId); ++i) { 473 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 474 if (hook != null) { 475 assertTrue(hook.execute(i)); 476 } 477 restartMasterProcedureExecutor(procExec); 478 ProcedureTestingUtility.waitProcedure(procExec, procId); 479 } 480 assertEquals(true, procExec.isRunning()); 481 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 482 } 483 484 public static void testRecoveryAndDoubleExecution( 485 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 486 testRecoveryAndDoubleExecution(procExec, procId, null); 487 } 488 489 /** 490 * Hook which will be executed on each step 491 */ 492 public interface StepHook { 493 /** 494 * @param step Step no. at which this will be executed 495 * @return false if test should fail otherwise true 496 */ 497 boolean execute(int step) throws IOException; 498 } 499 500 /** 501 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 502 * abort() is injected. If the procedure implement abort() this should result in rollback being 503 * triggered. Each rollback step is called twice, by restarting the executor after every step. At 504 * the end of this call the procedure should be finished and rolledback. This method assert on the 505 * procedure being terminated with an AbortException. 506 */ 507 public static void testRollbackAndDoubleExecution( 508 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 509 throws Exception { 510 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 511 } 512 513 public static void testRollbackAndDoubleExecution( 514 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 515 boolean waitForAsyncProcs) throws Exception { 516 // Execute up to last step 517 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 518 519 // Restart the executor and rollback the step twice 520 // rollback step N - kill before store update 521 // restart executor/store 522 // rollback step N - save on store 523 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 524 abortListener.addProcId(procId); 525 procExec.registerListener(abortListener); 526 try { 527 for (int i = 0; !procExec.isFinished(procId); ++i) { 528 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 529 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 530 restartMasterProcedureExecutor(procExec); 531 ProcedureTestingUtility.waitProcedure(procExec, procId); 532 } 533 } finally { 534 assertTrue(procExec.unregisterListener(abortListener)); 535 } 536 537 if (waitForAsyncProcs) { 538 // Sometimes there are other procedures still executing (including asynchronously spawned by 539 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 540 // store update. Let all pending procedures finish normally. 541 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 542 // check 3 times to confirm that the procedure executor has not been killed 543 for (int i = 0; i < 3; i++) { 544 if (!procExec.isRunning()) { 545 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" 546 + " to KillAndToggleBeforeStoreUpdate flag."); 547 restartMasterProcedureExecutor(procExec); 548 break; 549 } 550 Thread.sleep(1000); 551 } 552 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 553 } 554 555 assertEquals(true, procExec.isRunning()); 556 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 557 } 558 559 /** 560 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 561 * abort() is injected. If the procedure implement abort() this should result in rollback being 562 * triggered. At the end of this call the procedure should be finished and rolledback. This method 563 * assert on the procedure being terminated with an AbortException. 564 */ 565 public static void testRollbackRetriableFailure( 566 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 567 throws Exception { 568 // Execute up to last step 569 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 570 571 // execute the rollback 572 testRestartWithAbort(procExec, procId); 573 574 assertEquals(true, procExec.isRunning()); 575 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 576 } 577 578 /** 579 * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure 580 * implement abort() this should result in rollback being triggered. At the end of this call the 581 * procedure should be finished and rolledback, if abort is implemnted 582 */ 583 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 584 long procId) throws Exception { 585 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 586 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 587 abortListener.addProcId(procId); 588 procExec.registerListener(abortListener); 589 try { 590 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 591 LOG.info("Restart and rollback procId=" + procId); 592 restartMasterProcedureExecutor(procExec); 593 ProcedureTestingUtility.waitProcedure(procExec, procId); 594 } finally { 595 assertTrue(procExec.unregisterListener(abortListener)); 596 } 597 } 598 599 public static boolean tableExists(Connection conn, TableName tableName) throws IOException { 600 try (Admin admin = conn.getAdmin()) { 601 return admin.tableExists(tableName); 602 } 603 } 604 605 public static class InjectAbortOnLoadListener 606 implements ProcedureExecutor.ProcedureExecutorListener { 607 private final ProcedureExecutor<MasterProcedureEnv> procExec; 608 private TreeSet<Long> procsToAbort = null; 609 610 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 611 this.procExec = procExec; 612 } 613 614 public void addProcId(long procId) { 615 if (procsToAbort == null) { 616 procsToAbort = new TreeSet<>(); 617 } 618 procsToAbort.add(procId); 619 } 620 621 @Override 622 public void procedureLoaded(long procId) { 623 if (procsToAbort != null && !procsToAbort.contains(procId)) { 624 return; 625 } 626 procExec.abort(procId); 627 } 628 629 @Override 630 public void procedureAdded(long procId) { 631 /* no-op */ } 632 633 @Override 634 public void procedureFinished(long procId) { 635 /* no-op */ } 636 } 637}