001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.MetaTableAccessor;
036import org.apache.hadoop.hbase.MiniHBaseCluster;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.BufferedMutator;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.Durability;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.client.TableState;
052import org.apache.hadoop.hbase.master.HMaster;
053import org.apache.hadoop.hbase.master.RegionState;
054import org.apache.hadoop.hbase.master.TableStateManager;
055import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
056import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
057import org.apache.hadoop.hbase.procedure2.Procedure;
058import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
059import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
060import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
061import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.FSUtils;
066import org.apache.hadoop.hbase.util.MD5Hash;
067import org.apache.hadoop.hbase.util.ModifyRegionUtils;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@InterfaceAudience.Private
073public class MasterProcedureTestingUtility {
074  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
075
076  private MasterProcedureTestingUtility() {
077  }
078
079  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
080    throws Exception {
081    final MasterProcedureEnv env = procExec.getEnvironment();
082    final HMaster master = (HMaster) env.getMasterServices();
083    ProcedureTestingUtility.restart(procExec, true, true,
084      // stop services
085      new Callable<Void>() {
086        @Override
087        public Void call() throws Exception {
088          master.setServiceStarted(false);
089          AssignmentManager am = env.getAssignmentManager();
090          // try to simulate a master restart by removing the ServerManager states about seqIDs
091          for (RegionState regionState : am.getRegionStates().getRegionStates()) {
092            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
093          }
094          am.stop();
095          master.setInitialized(false);
096          return null;
097        }
098      },
099      // setup RIT before starting workers
100      new Callable<Void>() {
101
102        @Override
103        public Void call() throws Exception {
104          AssignmentManager am = env.getAssignmentManager();
105          am.start();
106          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
107          // comments there
108          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
109            .filter(p -> p instanceof TransitRegionStateProcedure)
110            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
111          // create server state node, to simulate master start up
112          env.getMasterServices().getServerManager().getOnlineServersList()
113            .forEach(am.getRegionStates()::createServer);
114          master.setServiceStarted(true);
115          return null;
116        }
117      },
118      // restart services
119      new Callable<Void>() {
120        @Override
121        public Void call() throws Exception {
122          AssignmentManager am = env.getAssignmentManager();
123          try {
124            am.joinCluster();
125            am.wakeMetaLoadedEvent();
126            master.setInitialized(true);
127          } catch (Exception e) {
128            LOG.warn("Failed to load meta", e);
129          }
130          return null;
131        }
132      });
133  }
134
135  // ==========================================================================
136  // Master failover utils
137  // ==========================================================================
138  public static void masterFailover(final HBaseTestingUtility testUtil) throws Exception {
139    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
140
141    // Kill the master
142    HMaster oldMaster = cluster.getMaster();
143    cluster.killMaster(cluster.getMaster().getServerName());
144
145    // Wait the secondary
146    waitBackupMaster(testUtil, oldMaster);
147  }
148
149  public static void waitBackupMaster(final HBaseTestingUtility testUtil, final HMaster oldMaster)
150    throws Exception {
151    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
152
153    HMaster newMaster = cluster.getMaster();
154    while (newMaster == null || newMaster == oldMaster) {
155      Thread.sleep(250);
156      newMaster = cluster.getMaster();
157    }
158
159    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
160      Thread.sleep(250);
161    }
162  }
163
164  // ==========================================================================
165  // Table Helpers
166  // ==========================================================================
167  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
168    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
169    for (int i = 0; i < family.length; ++i) {
170      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
171    }
172    return builder.build();
173  }
174
175  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
176    final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
177    TableDescriptor htd = createHTD(tableName, family);
178    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
179    long procId = ProcedureTestingUtility.submitAndWait(procExec,
180      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
181    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
182    return regions;
183  }
184
185  public static void validateTableCreation(final HMaster master, final TableName tableName,
186    final RegionInfo[] regions, String... family) throws IOException {
187    validateTableCreation(master, tableName, regions, true, family);
188  }
189
190  public static void validateTableCreation(final HMaster master, final TableName tableName,
191    final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
192    // check filesystem
193    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
194    final Path tableDir =
195      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
196    assertTrue(fs.exists(tableDir));
197    CommonFSUtils.logFileSystemState(fs, tableDir, LOG);
198    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
199    for (int i = 0; i < regions.length; ++i) {
200      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
201      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
202      assertTrue(unwantedRegionDirs.remove(regionDir));
203      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
204      for (int j = 0; j < family.length; ++j) {
205        final Path familyDir = new Path(regionDir, family[j]);
206        if (hasFamilyDirs) {
207          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
208          assertTrue(allFamilyDirs.remove(familyDir));
209        } else {
210          // TODO: WARN: Modify Table/Families does not create a family dir
211          if (!fs.exists(familyDir)) {
212            LOG.warn(family[j] + " family dir does not exist");
213          }
214          allFamilyDirs.remove(familyDir);
215        }
216      }
217      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
218    }
219    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
220    LOG.debug("Table directory layout is as expected.");
221
222    // check meta
223    assertTrue(tableExists(master.getConnection(), tableName));
224    assertEquals(regions.length, countMetaRegions(master, tableName));
225
226    // check htd
227    TableDescriptor htd = master.getTableDescriptors().get(tableName);
228    assertTrue("table descriptor not found", htd != null);
229    for (int i = 0; i < family.length; ++i) {
230      assertTrue("family not found " + family[i],
231        htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
232    }
233    assertEquals(family.length, htd.getColumnFamilyCount());
234
235    // checks store file tracker impl has been properly set in htd
236    String storeFileTrackerImpl =
237      StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration());
238    assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL));
239  }
240
241  public static void validateTableDeletion(final HMaster master, final TableName tableName)
242    throws IOException {
243    // check filesystem
244    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
245    final Path tableDir =
246      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
247    assertFalse(fs.exists(tableDir));
248
249    // check meta
250    assertFalse(tableExists(master.getConnection(), tableName));
251    assertEquals(0, countMetaRegions(master, tableName));
252
253    // check htd
254    assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null);
255  }
256
257  private static int countMetaRegions(final HMaster master, final TableName tableName)
258    throws IOException {
259    final AtomicInteger actualRegCount = new AtomicInteger(0);
260    final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
261      @Override
262      public boolean visit(Result rowResult) throws IOException {
263        RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
264        if (list == null) {
265          LOG.warn("No serialized RegionInfo in " + rowResult);
266          return true;
267        }
268        HRegionLocation l = list.getRegionLocation();
269        if (l == null) {
270          return true;
271        }
272        if (!l.getRegionInfo().getTable().equals(tableName)) {
273          return false;
274        }
275        if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
276        HRegionLocation[] locations = list.getRegionLocations();
277        for (HRegionLocation location : locations) {
278          if (location == null) continue;
279          ServerName serverName = location.getServerName();
280          // Make sure that regions are assigned to server
281          if (serverName != null && serverName.getAddress() != null) {
282            actualRegCount.incrementAndGet();
283          }
284        }
285        return true;
286      }
287    };
288    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
289    return actualRegCount.get();
290  }
291
292  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
293    throws IOException {
294    TableStateManager tsm = master.getTableStateManager();
295    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
296  }
297
298  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
299    throws IOException {
300    TableStateManager tsm = master.getTableStateManager();
301    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
302  }
303
304  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
305    final String family) throws IOException {
306    TableDescriptor htd = master.getTableDescriptors().get(tableName);
307    assertTrue(htd != null);
308
309    assertTrue(htd.hasColumnFamily(family.getBytes()));
310  }
311
312  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
313    final String family) throws IOException {
314    // verify htd
315    TableDescriptor htd = master.getTableDescriptors().get(tableName);
316    assertTrue(htd != null);
317    assertFalse(htd.hasColumnFamily(family.getBytes()));
318
319    // verify fs
320    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
321    final Path tableDir =
322      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
323    for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
324      final Path familyDir = new Path(regionDir, family);
325      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
326    }
327  }
328
329  public static void validateColumnFamilyModification(final HMaster master,
330    final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
331    throws IOException {
332    TableDescriptor htd = master.getTableDescriptors().get(tableName);
333    assertTrue(htd != null);
334
335    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes());
336    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
337  }
338
339  public static void loadData(final Connection connection, final TableName tableName, int rows,
340    final byte[][] splitKeys, final String... sfamilies) throws IOException {
341    byte[][] families = new byte[sfamilies.length][];
342    for (int i = 0; i < families.length; ++i) {
343      families[i] = Bytes.toBytes(sfamilies[i]);
344    }
345
346    BufferedMutator mutator = connection.getBufferedMutator(tableName);
347
348    // Ensure one row per region
349    assertTrue(rows >= splitKeys.length);
350    for (byte[] k : splitKeys) {
351      byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k);
352      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
353      mutator.mutate(createPut(families, key, value));
354      rows--;
355    }
356
357    // Add other extra rows. more rows, more files
358    while (rows-- > 0) {
359      byte[] value =
360        Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows));
361      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
362      mutator.mutate(createPut(families, key, value));
363    }
364    mutator.flush();
365  }
366
367  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
368    byte[] q = Bytes.toBytes("q");
369    Put put = new Put(key);
370    put.setDurability(Durability.SKIP_WAL);
371    for (byte[] family : families) {
372      put.addColumn(family, q, value);
373    }
374    return put;
375  }
376
377  // ==========================================================================
378  // Procedure Helpers
379  // ==========================================================================
380  public static long generateNonceGroup(final HMaster master) {
381    return master.getClusterConnection().getNonceGenerator().getNonceGroup();
382  }
383
384  public static long generateNonce(final HMaster master) {
385    return master.getClusterConnection().getNonceGenerator().newNonce();
386  }
387
388  /**
389   * Run through all procedure flow states TWICE while also restarting procedure executor at each
390   * step; i.e force a reread of procedure store.
391   * <p>
392   * It does
393   * <ol>
394   * <li>Execute step N - kill the executor before store update
395   * <li>Restart executor/store
396   * <li>Execute step N - and then save to store
397   * </ol>
398   * <p>
399   * This is a good test for finding state that needs persisting and steps that are not idempotent.
400   * Use this version of the test when a procedure executes all flow steps from start to finish.
401   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
402   */
403  public static void testRecoveryAndDoubleExecution(
404    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
405    final boolean expectExecRunning) throws Exception {
406    ProcedureTestingUtility.waitProcedure(procExec, procId);
407    assertEquals(false, procExec.isRunning());
408
409    // Restart the executor and execute the step twice
410    // execute step N - kill before store update
411    // restart executor/store
412    // execute step N - save on store
413    // NOTE: currently we make assumption that states/ steps are sequential. There are already
414    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
415    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
416    // that happens the states can not be treated as sequential steps and the condition in
417    // following while loop needs to be changed. We can use euqals/ not equals operator to check
418    // if the procedure has reached the user specified state. But there is a possibility that
419    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
420    // fix would be get all visited states by the procedure and then check if user speccified
421    // state is in that list. Current assumption of sequential proregression of steps/ states is
422    // made at multiple places so we can keep while condition below for simplicity.
423    Procedure<?> proc = procExec.getProcedure(procId);
424    int stepNum = proc instanceof StateMachineProcedure
425      ? ((StateMachineProcedure) proc).getCurrentStateId()
426      : 0;
427    for (;;) {
428      if (stepNum == lastStep) {
429        break;
430      }
431      LOG.info("Restart " + stepNum + " exec state=" + proc);
432      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
433      restartMasterProcedureExecutor(procExec);
434      ProcedureTestingUtility.waitProcedure(procExec, procId);
435      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
436      proc = procExec.getProcedure(procId);
437      stepNum = proc instanceof StateMachineProcedure
438        ? ((StateMachineProcedure) proc).getCurrentStateId()
439        : stepNum + 1;
440    }
441
442    assertEquals(expectExecRunning, procExec.isRunning());
443  }
444
445  /**
446   * Run through all procedure flow states TWICE while also restarting procedure executor at each
447   * step; i.e force a reread of procedure store.
448   * <p>
449   * It does
450   * <ol>
451   * <li>Execute step N - kill the executor before store update
452   * <li>Restart executor/store
453   * <li>Executes hook for each step twice
454   * <li>Execute step N - and then save to store
455   * </ol>
456   * <p>
457   * This is a good test for finding state that needs persisting and steps that are not idempotent.
458   * Use this version of the test when the order in which flow steps are executed is not start to
459   * finish; where the procedure may vary the flow steps dependent on circumstance found.
460   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
461   */
462  public static void testRecoveryAndDoubleExecution(
463    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
464    throws Exception {
465    ProcedureTestingUtility.waitProcedure(procExec, procId);
466    assertEquals(false, procExec.isRunning());
467    for (int i = 0; !procExec.isFinished(procId); ++i) {
468      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
469      if (hook != null) {
470        assertTrue(hook.execute(i));
471      }
472      restartMasterProcedureExecutor(procExec);
473      ProcedureTestingUtility.waitProcedure(procExec, procId);
474    }
475    assertEquals(true, procExec.isRunning());
476    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
477  }
478
479  public static void testRecoveryAndDoubleExecution(
480    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
481    testRecoveryAndDoubleExecution(procExec, procId, null);
482  }
483
484  /**
485   * Hook which will be executed on each step
486   */
487  public interface StepHook {
488    /**
489     * @param step Step no. at which this will be executed
490     * @return false if test should fail otherwise true
491     */
492    boolean execute(int step) throws IOException;
493  }
494
495  /**
496   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
497   * abort() is injected. If the procedure implement abort() this should result in rollback being
498   * triggered. Each rollback step is called twice, by restarting the executor after every step. At
499   * the end of this call the procedure should be finished and rolledback. This method assert on the
500   * procedure being terminated with an AbortException.
501   */
502  public static void testRollbackAndDoubleExecution(
503    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
504    throws Exception {
505    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
506  }
507
508  public static void testRollbackAndDoubleExecution(
509    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
510    boolean waitForAsyncProcs) throws Exception {
511    // Execute up to last step
512    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
513
514    // Restart the executor and rollback the step twice
515    // rollback step N - kill before store update
516    // restart executor/store
517    // rollback step N - save on store
518    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
519    abortListener.addProcId(procId);
520    procExec.registerListener(abortListener);
521    try {
522      for (int i = 0; !procExec.isFinished(procId); ++i) {
523        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
524        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
525        restartMasterProcedureExecutor(procExec);
526        ProcedureTestingUtility.waitProcedure(procExec, procId);
527      }
528    } finally {
529      assertTrue(procExec.unregisterListener(abortListener));
530    }
531
532    if (waitForAsyncProcs) {
533      // Sometimes there are other procedures still executing (including asynchronously spawned by
534      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
535      // store update. Let all pending procedures finish normally.
536      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
537      // check 3 times to confirm that the procedure executor has not been killed
538      for (int i = 0; i < 3; i++) {
539        if (!procExec.isRunning()) {
540          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due"
541            + " to KillAndToggleBeforeStoreUpdate flag.");
542          restartMasterProcedureExecutor(procExec);
543          break;
544        }
545        Thread.sleep(1000);
546      }
547      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
548    }
549
550    assertEquals(true, procExec.isRunning());
551    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
552  }
553
554  /**
555   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
556   * abort() is injected. If the procedure implement abort() this should result in rollback being
557   * triggered. At the end of this call the procedure should be finished and rolledback. This method
558   * assert on the procedure being terminated with an AbortException.
559   */
560  public static void testRollbackRetriableFailure(
561    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
562    throws Exception {
563    // Execute up to last step
564    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
565
566    // execute the rollback
567    testRestartWithAbort(procExec, procId);
568
569    assertEquals(true, procExec.isRunning());
570    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
571  }
572
573  /**
574   * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure
575   * implement abort() this should result in rollback being triggered. At the end of this call the
576   * procedure should be finished and rolledback, if abort is implemnted
577   */
578  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
579    long procId) throws Exception {
580    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
581    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
582    abortListener.addProcId(procId);
583    procExec.registerListener(abortListener);
584    try {
585      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
586      LOG.info("Restart and rollback procId=" + procId);
587      restartMasterProcedureExecutor(procExec);
588      ProcedureTestingUtility.waitProcedure(procExec, procId);
589    } finally {
590      assertTrue(procExec.unregisterListener(abortListener));
591    }
592  }
593
594  public static boolean tableExists(Connection conn, TableName tableName) throws IOException {
595    try (Admin admin = conn.getAdmin()) {
596      return admin.tableExists(tableName);
597    }
598  }
599
600  public static class InjectAbortOnLoadListener
601    implements ProcedureExecutor.ProcedureExecutorListener {
602    private final ProcedureExecutor<MasterProcedureEnv> procExec;
603    private TreeSet<Long> procsToAbort = null;
604
605    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
606      this.procExec = procExec;
607    }
608
609    public void addProcId(long procId) {
610      if (procsToAbort == null) {
611        procsToAbort = new TreeSet<>();
612      }
613      procsToAbort.add(procId);
614    }
615
616    @Override
617    public void procedureLoaded(long procId) {
618      if (procsToAbort != null && !procsToAbort.contains(procId)) {
619        return;
620      }
621      procExec.abort(procId);
622    }
623
624    @Override
625    public void procedureAdded(long procId) {
626      /* no-op */ }
627
628    @Override
629    public void procedureFinished(long procId) {
630      /* no-op */ }
631  }
632}