001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.everyItem;
023import static org.hamcrest.Matchers.not;
024import static org.junit.Assert.assertEquals;
025
026import java.io.IOException;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.atomic.AtomicBoolean;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.StartTestingClusterOption;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.BalanceRequest;
036import org.apache.hadoop.hbase.master.HMaster;
037import org.apache.hadoop.hbase.master.MasterServices;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
040import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.master.region.MasterRegion;
043import org.apache.hadoop.hbase.procedure2.Procedure;
044import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
045import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
046import org.apache.hadoop.hbase.regionserver.HRegionServer;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.testclassification.MasterTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.FutureUtils;
051import org.hamcrest.BaseMatcher;
052import org.hamcrest.Description;
053import org.hamcrest.Matcher;
054import org.junit.AfterClass;
055import org.junit.Before;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
062
063/**
064 * SCP does not support rollback actually, here we just want to simulate that when there is a code
065 * bug, SCP and its sub procedures will not hang there forever, and it will not mess up the
066 * procedure store.
067 */
068@Category({ MasterTests.class, LargeTests.class })
069public class TestRollbackSCP {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestRollbackSCP.class);
074
075  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
076
077  private static final TableName TABLE_NAME = TableName.valueOf("test");
078
079  private static final byte[] FAMILY = Bytes.toBytes("family");
080
081  private static final AtomicBoolean INJECTED = new AtomicBoolean(false);
082
083  private static final class AssignmentManagerForTest extends AssignmentManager {
084
085    public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) {
086      super(master, masterRegion);
087    }
088
089    @Override
090    CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
091      TransitRegionStateProcedure proc = regionNode.getProcedure();
092      if (!regionNode.getRegionInfo().isMetaRegion() && proc.hasParent()) {
093        Procedure<?> p =
094          getMaster().getMasterProcedureExecutor().getProcedure(proc.getRootProcId());
095        // fail the procedure if it is a sub procedure for SCP
096        if (p instanceof ServerCrashProcedure) {
097          if (INJECTED.compareAndSet(false, true)) {
098            ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(
099              getMaster().getMasterProcedureExecutor(), true);
100          }
101          return FutureUtils.failedFuture(new RuntimeException("inject code bug"));
102        }
103      }
104      return super.persistToMeta(regionNode);
105    }
106  }
107
108  public static final class HMasterForTest extends HMaster {
109
110    public HMasterForTest(Configuration conf) throws IOException {
111      super(conf);
112    }
113
114    @Override
115    protected AssignmentManager createAssignmentManager(MasterServices master,
116      MasterRegion masterRegion) {
117      return new AssignmentManagerForTest(master, masterRegion);
118    }
119  }
120
121  @BeforeClass
122  public static void setUpBeforeClass() throws Exception {
123    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
124    UTIL.startMiniCluster(StartTestingClusterOption.builder().numDataNodes(3).numRegionServers(3)
125      .masterClass(HMasterForTest.class).build());
126    UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
127    UTIL.waitTableAvailable(TABLE_NAME);
128    UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build());
129    UTIL.waitUntilNoRegionsInTransition();
130    UTIL.getAdmin().balancerSwitch(false, true);
131  }
132
133  @AfterClass
134  public static void tearDownAfterClass() throws IOException {
135    UTIL.shutdownMiniCluster();
136  }
137
138  @Before
139  public void setUp() throws IOException {
140    UTIL.ensureSomeNonStoppedRegionServersAvailable(2);
141  }
142
143  private ServerCrashProcedure getSCPForServer(ServerName serverName) throws IOException {
144    return UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
145      .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p)
146      .filter(p -> p.getServerName().equals(serverName)).findFirst().orElse(null);
147  }
148
149  private Matcher<Procedure<MasterProcedureEnv>> subProcOf(Procedure<MasterProcedureEnv> proc) {
150    return new BaseMatcher<Procedure<MasterProcedureEnv>>() {
151
152      @Override
153      public boolean matches(Object item) {
154        if (!(item instanceof Procedure)) {
155          return false;
156        }
157        Procedure<?> p = (Procedure<?>) item;
158        return p.hasParent() && p.getRootProcId() == proc.getProcId();
159      }
160
161      @Override
162      public void describeTo(Description description) {
163        description.appendText("sub procedure of(").appendValue(proc).appendText(")");
164      }
165    };
166  }
167
168  @Test
169  public void testFailAndRollback() throws Exception {
170    HRegionServer rsWithMeta = UTIL.getRSForFirstRegionInTable(TableName.META_TABLE_NAME);
171    UTIL.getMiniHBaseCluster().killRegionServer(rsWithMeta.getServerName());
172    UTIL.waitFor(15000, () -> getSCPForServer(rsWithMeta.getServerName()) != null);
173    ServerCrashProcedure scp = getSCPForServer(rsWithMeta.getServerName());
174    ProcedureExecutor<MasterProcedureEnv> procExec =
175      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
176    // wait for the procedure to stop, as we inject a code bug and also set kill before store update
177    UTIL.waitFor(30000, () -> !procExec.isRunning());
178    // make sure that finally we could successfully rollback the procedure
179    while (scp.getState() != ProcedureState.FAILED || !procExec.isRunning()) {
180      MasterProcedureTestingUtility.restartMasterProcedureExecutor(procExec);
181      ProcedureTestingUtility.waitProcedure(procExec, scp);
182    }
183    assertEquals(scp.getState(), ProcedureState.FAILED);
184    assertThat(scp.getException().getMessage(), containsString("inject code bug"));
185    // make sure all sub procedures are cleaned up
186    assertThat(procExec.getProcedures(), everyItem(not(subProcOf(scp))));
187  }
188}