001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.everyItem;
023import static org.hamcrest.Matchers.not;
024import static org.junit.Assert.assertEquals;
025
026import java.io.IOException;
027import java.util.concurrent.atomic.AtomicBoolean;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.StartMiniClusterOption;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.BalanceRequest;
035import org.apache.hadoop.hbase.master.HMaster;
036import org.apache.hadoop.hbase.master.MasterServices;
037import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
038import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
039import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility;
040import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
041import org.apache.hadoop.hbase.master.region.MasterRegion;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
044import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
045import org.apache.hadoop.hbase.regionserver.HRegionServer;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.testclassification.MasterTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.hamcrest.BaseMatcher;
050import org.hamcrest.Description;
051import org.hamcrest.Matcher;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
060
061/**
062 * SCP does not support rollback actually, here we just want to simulate that when there is a code
063 * bug, SCP and its sub procedures will not hang there forever, and it will not mess up the
064 * procedure store.
065 */
066@Category({ MasterTests.class, LargeTests.class })
067public class TestRollbackSCP {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestRollbackSCP.class);
072
073  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
074
075  private static final TableName TABLE_NAME = TableName.valueOf("test");
076
077  private static final byte[] FAMILY = Bytes.toBytes("family");
078
079  private static final AtomicBoolean INJECTED = new AtomicBoolean(false);
080
081  private static final class AssignmentManagerForTest extends AssignmentManager {
082
083    public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion) {
084      super(master, masterRegion);
085    }
086
087    @Override
088    void persistToMeta(RegionStateNode regionNode) throws IOException {
089      TransitRegionStateProcedure proc = regionNode.getProcedure();
090      if (!regionNode.getRegionInfo().isMetaRegion() && proc.hasParent()) {
091        Procedure<?> p =
092          getMaster().getMasterProcedureExecutor().getProcedure(proc.getRootProcId());
093        // fail the procedure if it is a sub procedure for SCP
094        if (p instanceof ServerCrashProcedure) {
095          if (INJECTED.compareAndSet(false, true)) {
096            ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(
097              getMaster().getMasterProcedureExecutor(), true);
098          }
099          throw new RuntimeException("inject code bug");
100        }
101      }
102      super.persistToMeta(regionNode);
103    }
104  }
105
106  public static final class HMasterForTest extends HMaster {
107
108    public HMasterForTest(Configuration conf) throws IOException {
109      super(conf);
110    }
111
112    @Override
113    protected AssignmentManager createAssignmentManager(MasterServices master,
114      MasterRegion masterRegion) {
115      return new AssignmentManagerForTest(master, masterRegion);
116    }
117  }
118
119  @BeforeClass
120  public static void setUpBeforeClass() throws Exception {
121    UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
122    UTIL.startMiniCluster(StartMiniClusterOption.builder().numDataNodes(3).numRegionServers(3)
123      .masterClass(HMasterForTest.class).build());
124    UTIL.createMultiRegionTable(TABLE_NAME, FAMILY);
125    UTIL.waitTableAvailable(TABLE_NAME);
126    UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build());
127    UTIL.waitUntilNoRegionsInTransition();
128    UTIL.getAdmin().balancerSwitch(false, true);
129  }
130
131  @AfterClass
132  public static void tearDownAfterClass() throws IOException {
133    UTIL.shutdownMiniCluster();
134  }
135
136  @Before
137  public void setUp() throws IOException {
138    UTIL.ensureSomeNonStoppedRegionServersAvailable(2);
139  }
140
141  private ServerCrashProcedure getSCPForServer(ServerName serverName) throws IOException {
142    return UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
143      .filter(p -> p instanceof ServerCrashProcedure).map(p -> (ServerCrashProcedure) p)
144      .filter(p -> p.getServerName().equals(serverName)).findFirst().orElse(null);
145  }
146
147  private Matcher<Procedure<MasterProcedureEnv>> subProcOf(Procedure<MasterProcedureEnv> proc) {
148    return new BaseMatcher<Procedure<MasterProcedureEnv>>() {
149
150      @Override
151      public boolean matches(Object item) {
152        if (!(item instanceof Procedure)) {
153          return false;
154        }
155        Procedure<?> p = (Procedure<?>) item;
156        return p.hasParent() && p.getRootProcId() == proc.getProcId();
157      }
158
159      @Override
160      public void describeTo(Description description) {
161        description.appendText("sub procedure of(").appendValue(proc).appendText(")");
162      }
163    };
164  }
165
166  @Test
167  public void testFailAndRollback() throws Exception {
168    HRegionServer rsWithMeta = UTIL.getRSForFirstRegionInTable(TableName.META_TABLE_NAME);
169    UTIL.getMiniHBaseCluster().killRegionServer(rsWithMeta.getServerName());
170    UTIL.waitFor(15000, () -> getSCPForServer(rsWithMeta.getServerName()) != null);
171    ServerCrashProcedure scp = getSCPForServer(rsWithMeta.getServerName());
172    ProcedureExecutor<MasterProcedureEnv> procExec =
173      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
174    // wait for the procedure to stop, as we inject a code bug and also set kill before store update
175    UTIL.waitFor(30000, () -> !procExec.isRunning());
176    // make sure that finally we could successfully rollback the procedure
177    while (scp.getState() != ProcedureState.FAILED || !procExec.isRunning()) {
178      MasterProcedureTestingUtility.restartMasterProcedureExecutor(procExec);
179      ProcedureTestingUtility.waitProcedure(procExec, scp);
180    }
181    assertEquals(scp.getState(), ProcedureState.FAILED);
182    assertThat(scp.getException().getMessage(), containsString("inject code bug"));
183    // make sure all sub procedures are cleaned up
184    assertThat(procExec.getProcedures(), everyItem(not(subProcOf(scp))));
185  }
186}