001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.concurrent.Exchanger;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
031import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.After;
035import org.junit.AfterClass;
036import org.junit.Before;
037import org.junit.BeforeClass;
038import org.junit.ClassRule;
039import org.junit.Rule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.rules.TestName;
043
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
045
046@Category({ MasterTests.class, SmallTests.class })
047public class TestForceUpdateProcedure {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051    HBaseClassTestRule.forClass(TestForceUpdateProcedure.class);
052
053  private static HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
054
055  private static WALProcedureStore STORE;
056
057  private static ProcedureExecutor<Void> EXEC;
058
059  private static Exchanger<Boolean> EXCHANGER = new Exchanger<>();
060
061  private static int WAL_COUNT = 5;
062
063  @Rule
064  public final TestName name = new TestName();
065
066  private void createStoreAndExecutor() throws IOException {
067    UTIL.getConfiguration().setInt(CompletedProcedureCleaner.CLEANER_INTERVAL_CONF_KEY, 1000);
068    Path logDir = UTIL.getDataTestDir(name.getMethodName());
069    STORE = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
070    STORE.start(1);
071    EXEC = new ProcedureExecutor<Void>(UTIL.getConfiguration(), null, STORE);
072    ProcedureTestingUtility.initAndStartWorkers(EXEC, 1, true);
073  }
074
075  @BeforeClass
076  public static void setUpBeforeClass() throws IOException {
077    UTIL.getConfiguration().setInt(WALProcedureStore.WAL_COUNT_WARN_THRESHOLD_CONF_KEY, WAL_COUNT);
078  }
079
080  private void stopStoreAndExecutor() {
081    EXEC.stop();
082    STORE.stop(false);
083    EXEC = null;
084    STORE = null;
085  }
086
087  @AfterClass
088  public static void tearDownAfterClass() throws IOException {
089    UTIL.cleanupTestDir();
090  }
091
092  @Before
093  public void setUp() throws IOException {
094    createStoreAndExecutor();
095  }
096
097  @After
098  public void tearDown() {
099    stopStoreAndExecutor();
100  }
101
102  public static final class WaitingProcedure extends NoopProcedure<Void> {
103
104    @Override
105    protected Procedure<Void>[] execute(Void env)
106      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
107      EXCHANGER.exchange(Boolean.TRUE);
108      setState(ProcedureState.WAITING_TIMEOUT);
109      setTimeout(Integer.MAX_VALUE);
110      throw new ProcedureSuspendedException();
111    }
112  }
113
114  public static final class ParentProcedure extends NoopProcedure<Void> {
115
116    @SuppressWarnings("unchecked")
117    @Override
118    protected Procedure<Void>[] execute(Void env)
119      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
120      return new Procedure[] { new NoopProcedure<>(), new WaitingProcedure() };
121    }
122  }
123
124  public static final class ExchangeProcedure extends NoopProcedure<Void> {
125
126    @SuppressWarnings("unchecked")
127    @Override
128    protected Procedure<Void>[] execute(Void env)
129      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
130      if (EXCHANGER.exchange(Boolean.TRUE)) {
131        return new Procedure[] { this };
132      } else {
133        return null;
134      }
135    }
136  }
137
138  public static final class NoopNoAckProcedure extends NoopProcedure<Void> {
139
140    @Override
141    protected boolean shouldWaitClientAck(Void env) {
142      return false;
143    }
144  }
145
146  @Test
147  public void testProcedureStuck() throws IOException, InterruptedException {
148    EXEC.submitProcedure(new ParentProcedure());
149    EXCHANGER.exchange(Boolean.TRUE);
150    UTIL.waitFor(10000, () -> EXEC.getActiveExecutorCount() == 0);
151    // The above operations are used to make sure that we have persist the states of the two
152    // procedures.
153    long procId = EXEC.submitProcedure(new ExchangeProcedure());
154    assertEquals(1, STORE.getActiveLogs().size());
155    for (int i = 0; i < WAL_COUNT - 1; i++) {
156      assertTrue(STORE.rollWriterForTesting());
157      // The WaitinProcedure never gets updated so we can not delete the oldest wal file, so the
158      // number of wal files will increase
159      assertEquals(2 + i, STORE.getActiveLogs().size());
160      EXCHANGER.exchange(Boolean.TRUE);
161      Thread.sleep(1000);
162    }
163    STORE.rollWriterForTesting();
164    // Finish the ExchangeProcedure
165    EXCHANGER.exchange(Boolean.FALSE);
166    // Make sure that we can delete several wal files because we force update the state of
167    // WaitingProcedure. Notice that the last closed wal files can not be deleted, as when rolling
168    // the newest wal file does not have anything in it, and in the closed file we still have the
169    // state for the ExchangeProcedure so it can not be deleted
170    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 2);
171    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
172    // Make sure that after the force update we could still load the procedures
173    stopStoreAndExecutor();
174    createStoreAndExecutor();
175    Map<Class<?>, Procedure<Void>> procMap = new HashMap<>();
176    EXEC.getActiveProceduresNoCopy().forEach(p -> procMap.put(p.getClass(), p));
177    assertEquals(3, procMap.size());
178    ParentProcedure parentProc = (ParentProcedure) procMap.get(ParentProcedure.class);
179    assertEquals(ProcedureState.WAITING, parentProc.getState());
180    WaitingProcedure waitingProc = (WaitingProcedure) procMap.get(WaitingProcedure.class);
181    assertEquals(ProcedureState.WAITING_TIMEOUT, waitingProc.getState());
182    NoopProcedure<Void> noopProc = (NoopProcedure<Void>) procMap.get(NoopProcedure.class);
183    assertEquals(ProcedureState.SUCCESS, noopProc.getState());
184  }
185
186  @Test
187  public void testCompletedProcedure() throws InterruptedException, IOException {
188    long procId = EXEC.submitProcedure(new ExchangeProcedure());
189    EXCHANGER.exchange(Boolean.FALSE);
190    UTIL.waitFor(10000, () -> EXEC.isFinished(procId));
191    for (int i = 0; i < WAL_COUNT - 1; i++) {
192      assertTrue(STORE.rollWriterForTesting());
193      // The exchange procedure is completed but still not deleted yet so we can not delete the
194      // oldest wal file
195      long pid = EXEC.submitProcedure(new NoopNoAckProcedure());
196      assertEquals(2 + i, STORE.getActiveLogs().size());
197      UTIL.waitFor(10000, () -> EXEC.isFinished(pid));
198    }
199    // Only the exchange procedure can not be deleted
200    UTIL.waitFor(10000, () -> EXEC.getCompletedSize() == 1);
201    STORE.rollWriterForTesting();
202    UTIL.waitFor(10000, () -> STORE.getActiveLogs().size() <= 1);
203  }
204}