001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.MetaTableAccessor; 036import org.apache.hadoop.hbase.MiniHBaseCluster; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.BufferedMutator; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Durability; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.client.TableState; 052import org.apache.hadoop.hbase.master.HMaster; 053import org.apache.hadoop.hbase.master.RegionState; 054import org.apache.hadoop.hbase.master.TableStateManager; 055import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 056import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 059import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 060import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 061import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.apache.hadoop.hbase.util.FSUtils; 066import org.apache.hadoop.hbase.util.MD5Hash; 067import org.apache.hadoop.hbase.util.ModifyRegionUtils; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@InterfaceAudience.Private 073public class MasterProcedureTestingUtility { 074 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 075 076 private MasterProcedureTestingUtility() { 077 } 078 079 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 080 throws Exception { 081 final MasterProcedureEnv env = procExec.getEnvironment(); 082 final HMaster master = (HMaster) env.getMasterServices(); 083 ProcedureTestingUtility.restart(procExec, true, true, 084 // stop services 085 new Callable<Void>() { 086 @Override 087 public Void call() throws Exception { 088 master.setServiceStarted(false); 089 AssignmentManager am = env.getAssignmentManager(); 090 // try to simulate a master restart by removing the ServerManager states about seqIDs 091 for (RegionState regionState : am.getRegionStates().getRegionStates()) { 092 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 093 } 094 am.stop(); 095 master.setInitialized(false); 096 return null; 097 } 098 }, 099 // setup RIT before starting workers 100 new Callable<Void>() { 101 102 @Override 103 public Void call() throws Exception { 104 AssignmentManager am = env.getAssignmentManager(); 105 am.start(); 106 // just follow the same way with HMaster.finishActiveMasterInitialization. See the 107 // comments there 108 am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess()) 109 .filter(p -> p instanceof TransitRegionStateProcedure) 110 .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList())); 111 // create server state node, to simulate master start up 112 env.getMasterServices().getServerManager().getOnlineServersList() 113 .forEach(am.getRegionStates()::createServer); 114 master.setServiceStarted(true); 115 return null; 116 } 117 }, 118 // restart services 119 new Callable<Void>() { 120 @Override 121 public Void call() throws Exception { 122 AssignmentManager am = env.getAssignmentManager(); 123 try { 124 am.joinCluster(); 125 am.wakeMetaLoadedEvent(); 126 master.setInitialized(true); 127 } catch (Exception e) { 128 LOG.warn("Failed to load meta", e); 129 } 130 return null; 131 } 132 }); 133 } 134 135 // ========================================================================== 136 // Master failover utils 137 // ========================================================================== 138 public static void masterFailover(final HBaseTestingUtility testUtil) throws Exception { 139 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 140 141 // Kill the master 142 HMaster oldMaster = cluster.getMaster(); 143 cluster.killMaster(cluster.getMaster().getServerName()); 144 145 // Wait the secondary 146 waitBackupMaster(testUtil, oldMaster); 147 } 148 149 public static void waitBackupMaster(final HBaseTestingUtility testUtil, final HMaster oldMaster) 150 throws Exception { 151 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 152 153 HMaster newMaster = cluster.getMaster(); 154 while (newMaster == null || newMaster == oldMaster) { 155 Thread.sleep(250); 156 newMaster = cluster.getMaster(); 157 } 158 159 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 160 Thread.sleep(250); 161 } 162 } 163 164 // ========================================================================== 165 // Table Helpers 166 // ========================================================================== 167 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 168 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 169 for (int i = 0; i < family.length; ++i) { 170 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 171 } 172 return builder.build(); 173 } 174 175 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 176 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 177 TableDescriptor htd = createHTD(tableName, family); 178 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 179 long procId = ProcedureTestingUtility.submitAndWait(procExec, 180 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 181 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 182 return regions; 183 } 184 185 public static void validateTableCreation(final HMaster master, final TableName tableName, 186 final RegionInfo[] regions, String... family) throws IOException { 187 validateTableCreation(master, tableName, regions, true, family); 188 } 189 190 public static void validateTableCreation(final HMaster master, final TableName tableName, 191 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 192 // check filesystem 193 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 194 final Path tableDir = 195 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 196 assertTrue(fs.exists(tableDir)); 197 CommonFSUtils.logFileSystemState(fs, tableDir, LOG); 198 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 199 for (int i = 0; i < regions.length; ++i) { 200 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 201 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 202 assertTrue(unwantedRegionDirs.remove(regionDir)); 203 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 204 for (int j = 0; j < family.length; ++j) { 205 final Path familyDir = new Path(regionDir, family[j]); 206 if (hasFamilyDirs) { 207 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 208 assertTrue(allFamilyDirs.remove(familyDir)); 209 } else { 210 // TODO: WARN: Modify Table/Families does not create a family dir 211 if (!fs.exists(familyDir)) { 212 LOG.warn(family[j] + " family dir does not exist"); 213 } 214 allFamilyDirs.remove(familyDir); 215 } 216 } 217 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 218 } 219 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 220 LOG.debug("Table directory layout is as expected."); 221 222 // check meta 223 assertTrue(tableExists(master.getConnection(), tableName)); 224 assertEquals(regions.length, countMetaRegions(master, tableName)); 225 226 // check htd 227 TableDescriptor htd = master.getTableDescriptors().get(tableName); 228 assertTrue("table descriptor not found", htd != null); 229 for (int i = 0; i < family.length; ++i) { 230 assertTrue("family not found " + family[i], 231 htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 232 } 233 assertEquals(family.length, htd.getColumnFamilyCount()); 234 235 // checks store file tracker impl has been properly set in htd 236 String storeFileTrackerImpl = 237 StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration()); 238 assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL)); 239 } 240 241 public static void validateTableDeletion(final HMaster master, final TableName tableName) 242 throws IOException { 243 // check filesystem 244 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 245 final Path tableDir = 246 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 247 assertFalse(fs.exists(tableDir)); 248 249 // check meta 250 assertFalse(tableExists(master.getConnection(), tableName)); 251 assertEquals(0, countMetaRegions(master, tableName)); 252 253 // check htd 254 assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null); 255 } 256 257 private static int countMetaRegions(final HMaster master, final TableName tableName) 258 throws IOException { 259 final AtomicInteger actualRegCount = new AtomicInteger(0); 260 final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 261 @Override 262 public boolean visit(Result rowResult) throws IOException { 263 RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); 264 if (list == null) { 265 LOG.warn("No serialized RegionInfo in " + rowResult); 266 return true; 267 } 268 HRegionLocation l = list.getRegionLocation(); 269 if (l == null) { 270 return true; 271 } 272 if (!l.getRegionInfo().getTable().equals(tableName)) { 273 return false; 274 } 275 if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; 276 HRegionLocation[] locations = list.getRegionLocations(); 277 for (HRegionLocation location : locations) { 278 if (location == null) continue; 279 ServerName serverName = location.getServerName(); 280 // Make sure that regions are assigned to server 281 if (serverName != null && serverName.getAddress() != null) { 282 actualRegCount.incrementAndGet(); 283 } 284 } 285 return true; 286 } 287 }; 288 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 289 return actualRegCount.get(); 290 } 291 292 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 293 throws IOException { 294 TableStateManager tsm = master.getTableStateManager(); 295 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 296 } 297 298 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 299 throws IOException { 300 TableStateManager tsm = master.getTableStateManager(); 301 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 302 } 303 304 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 305 final String family) throws IOException { 306 TableDescriptor htd = master.getTableDescriptors().get(tableName); 307 assertTrue(htd != null); 308 309 assertTrue(htd.hasColumnFamily(family.getBytes())); 310 } 311 312 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 313 final String family) throws IOException { 314 // verify htd 315 TableDescriptor htd = master.getTableDescriptors().get(tableName); 316 assertTrue(htd != null); 317 assertFalse(htd.hasColumnFamily(family.getBytes())); 318 319 // verify fs 320 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 321 final Path tableDir = 322 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 323 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 324 final Path familyDir = new Path(regionDir, family); 325 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 326 } 327 } 328 329 public static void validateColumnFamilyModification(final HMaster master, 330 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 331 throws IOException { 332 TableDescriptor htd = master.getTableDescriptors().get(tableName); 333 assertTrue(htd != null); 334 335 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes()); 336 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 337 } 338 339 public static void loadData(final Connection connection, final TableName tableName, int rows, 340 final byte[][] splitKeys, final String... sfamilies) throws IOException { 341 byte[][] families = new byte[sfamilies.length][]; 342 for (int i = 0; i < families.length; ++i) { 343 families[i] = Bytes.toBytes(sfamilies[i]); 344 } 345 346 BufferedMutator mutator = connection.getBufferedMutator(tableName); 347 348 // Ensure one row per region 349 assertTrue(rows >= splitKeys.length); 350 for (byte[] k : splitKeys) { 351 byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k); 352 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 353 mutator.mutate(createPut(families, key, value)); 354 rows--; 355 } 356 357 // Add other extra rows. more rows, more files 358 while (rows-- > 0) { 359 byte[] value = 360 Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows)); 361 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 362 mutator.mutate(createPut(families, key, value)); 363 } 364 mutator.flush(); 365 } 366 367 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 368 byte[] q = Bytes.toBytes("q"); 369 Put put = new Put(key); 370 put.setDurability(Durability.SKIP_WAL); 371 for (byte[] family : families) { 372 put.addColumn(family, q, value); 373 } 374 return put; 375 } 376 377 // ========================================================================== 378 // Procedure Helpers 379 // ========================================================================== 380 public static long generateNonceGroup(final HMaster master) { 381 return master.getClusterConnection().getNonceGenerator().getNonceGroup(); 382 } 383 384 public static long generateNonce(final HMaster master) { 385 return master.getClusterConnection().getNonceGenerator().newNonce(); 386 } 387 388 /** 389 * Run through all procedure flow states TWICE while also restarting procedure executor at each 390 * step; i.e force a reread of procedure store. 391 * <p> 392 * It does 393 * <ol> 394 * <li>Execute step N - kill the executor before store update 395 * <li>Restart executor/store 396 * <li>Execute step N - and then save to store 397 * </ol> 398 * <p> 399 * This is a good test for finding state that needs persisting and steps that are not idempotent. 400 * Use this version of the test when a procedure executes all flow steps from start to finish. 401 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 402 */ 403 public static void testRecoveryAndDoubleExecution( 404 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 405 final boolean expectExecRunning) throws Exception { 406 ProcedureTestingUtility.waitProcedure(procExec, procId); 407 assertEquals(false, procExec.isRunning()); 408 409 // Restart the executor and execute the step twice 410 // execute step N - kill before store update 411 // restart executor/store 412 // execute step N - save on store 413 // NOTE: currently we make assumption that states/ steps are sequential. There are already 414 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 415 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 416 // that happens the states can not be treated as sequential steps and the condition in 417 // following while loop needs to be changed. We can use euqals/ not equals operator to check 418 // if the procedure has reached the user specified state. But there is a possibility that 419 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 420 // fix would be get all visited states by the procedure and then check if user speccified 421 // state is in that list. Current assumption of sequential proregression of steps/ states is 422 // made at multiple places so we can keep while condition below for simplicity. 423 Procedure<?> proc = procExec.getProcedure(procId); 424 int stepNum = proc instanceof StateMachineProcedure 425 ? ((StateMachineProcedure) proc).getCurrentStateId() 426 : 0; 427 for (;;) { 428 if (stepNum == lastStep) { 429 break; 430 } 431 LOG.info("Restart " + stepNum + " exec state=" + proc); 432 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 433 restartMasterProcedureExecutor(procExec); 434 ProcedureTestingUtility.waitProcedure(procExec, procId); 435 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 436 proc = procExec.getProcedure(procId); 437 stepNum = proc instanceof StateMachineProcedure 438 ? ((StateMachineProcedure) proc).getCurrentStateId() 439 : stepNum + 1; 440 } 441 442 assertEquals(expectExecRunning, procExec.isRunning()); 443 } 444 445 /** 446 * Run through all procedure flow states TWICE while also restarting procedure executor at each 447 * step; i.e force a reread of procedure store. 448 * <p> 449 * It does 450 * <ol> 451 * <li>Execute step N - kill the executor before store update 452 * <li>Restart executor/store 453 * <li>Executes hook for each step twice 454 * <li>Execute step N - and then save to store 455 * </ol> 456 * <p> 457 * This is a good test for finding state that needs persisting and steps that are not idempotent. 458 * Use this version of the test when the order in which flow steps are executed is not start to 459 * finish; where the procedure may vary the flow steps dependent on circumstance found. 460 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 461 */ 462 public static void testRecoveryAndDoubleExecution( 463 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 464 throws Exception { 465 ProcedureTestingUtility.waitProcedure(procExec, procId); 466 assertEquals(false, procExec.isRunning()); 467 for (int i = 0; !procExec.isFinished(procId); ++i) { 468 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 469 if (hook != null) { 470 assertTrue(hook.execute(i)); 471 } 472 restartMasterProcedureExecutor(procExec); 473 ProcedureTestingUtility.waitProcedure(procExec, procId); 474 } 475 assertEquals(true, procExec.isRunning()); 476 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 477 } 478 479 public static void testRecoveryAndDoubleExecution( 480 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 481 testRecoveryAndDoubleExecution(procExec, procId, null); 482 } 483 484 /** 485 * Hook which will be executed on each step 486 */ 487 public interface StepHook { 488 /** 489 * @param step Step no. at which this will be executed 490 * @return false if test should fail otherwise true 491 */ 492 boolean execute(int step) throws IOException; 493 } 494 495 /** 496 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 497 * abort() is injected. If the procedure implement abort() this should result in rollback being 498 * triggered. Each rollback step is called twice, by restarting the executor after every step. At 499 * the end of this call the procedure should be finished and rolledback. This method assert on the 500 * procedure being terminated with an AbortException. 501 */ 502 public static void testRollbackAndDoubleExecution( 503 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 504 throws Exception { 505 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 506 } 507 508 public static void testRollbackAndDoubleExecution( 509 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 510 boolean waitForAsyncProcs) throws Exception { 511 // Execute up to last step 512 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 513 514 // Restart the executor and rollback the step twice 515 // rollback step N - kill before store update 516 // restart executor/store 517 // rollback step N - save on store 518 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 519 abortListener.addProcId(procId); 520 procExec.registerListener(abortListener); 521 try { 522 for (int i = 0; !procExec.isFinished(procId); ++i) { 523 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 524 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 525 restartMasterProcedureExecutor(procExec); 526 ProcedureTestingUtility.waitProcedure(procExec, procId); 527 } 528 } finally { 529 assertTrue(procExec.unregisterListener(abortListener)); 530 } 531 532 if (waitForAsyncProcs) { 533 // Sometimes there are other procedures still executing (including asynchronously spawned by 534 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 535 // store update. Let all pending procedures finish normally. 536 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 537 // check 3 times to confirm that the procedure executor has not been killed 538 for (int i = 0; i < 3; i++) { 539 if (!procExec.isRunning()) { 540 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" 541 + " to KillAndToggleBeforeStoreUpdate flag."); 542 restartMasterProcedureExecutor(procExec); 543 break; 544 } 545 Thread.sleep(1000); 546 } 547 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 548 } 549 550 assertEquals(true, procExec.isRunning()); 551 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 552 } 553 554 /** 555 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 556 * abort() is injected. If the procedure implement abort() this should result in rollback being 557 * triggered. At the end of this call the procedure should be finished and rolledback. This method 558 * assert on the procedure being terminated with an AbortException. 559 */ 560 public static void testRollbackRetriableFailure( 561 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 562 throws Exception { 563 // Execute up to last step 564 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 565 566 // execute the rollback 567 testRestartWithAbort(procExec, procId); 568 569 assertEquals(true, procExec.isRunning()); 570 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 571 } 572 573 /** 574 * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure 575 * implement abort() this should result in rollback being triggered. At the end of this call the 576 * procedure should be finished and rolledback, if abort is implemnted 577 */ 578 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 579 long procId) throws Exception { 580 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 581 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 582 abortListener.addProcId(procId); 583 procExec.registerListener(abortListener); 584 try { 585 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 586 LOG.info("Restart and rollback procId=" + procId); 587 restartMasterProcedureExecutor(procExec); 588 ProcedureTestingUtility.waitProcedure(procExec, procId); 589 } finally { 590 assertTrue(procExec.unregisterListener(abortListener)); 591 } 592 } 593 594 public static boolean tableExists(Connection conn, TableName tableName) throws IOException { 595 try (Admin admin = conn.getAdmin()) { 596 return admin.tableExists(tableName); 597 } 598 } 599 600 public static class InjectAbortOnLoadListener 601 implements ProcedureExecutor.ProcedureExecutorListener { 602 private final ProcedureExecutor<MasterProcedureEnv> procExec; 603 private TreeSet<Long> procsToAbort = null; 604 605 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 606 this.procExec = procExec; 607 } 608 609 public void addProcId(long procId) { 610 if (procsToAbort == null) { 611 procsToAbort = new TreeSet<>(); 612 } 613 procsToAbort.add(procId); 614 } 615 616 @Override 617 public void procedureLoaded(long procId) { 618 if (procsToAbort != null && !procsToAbort.contains(procId)) { 619 return; 620 } 621 procExec.abort(procId); 622 } 623 624 @Override 625 public void procedureAdded(long procId) { 626 /* no-op */ } 627 628 @Override 629 public void procedureFinished(long procId) { 630 /* no-op */ } 631 } 632}