001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({ MasterTests.class, SmallTests.class })
044public class TestYieldProcedures {
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestYieldProcedures.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
052  private static final Procedure NULL_PROC = null;
053
054  private ProcedureExecutor<TestProcEnv> procExecutor;
055  private TestScheduler procRunnables;
056  private ProcedureStore procStore;
057
058  private HBaseCommonTestingUtility htu;
059  private FileSystem fs;
060  private Path testDir;
061  private Path logDir;
062
063  @Before
064  public void setUp() throws IOException {
065    htu = new HBaseCommonTestingUtility();
066    testDir = htu.getDataTestDir();
067    fs = testDir.getFileSystem(htu.getConfiguration());
068    assertTrue(testDir.depth() > 1);
069
070    logDir = new Path(testDir, "proc-logs");
071    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
072    procRunnables = new TestScheduler();
073    procExecutor =
074      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
077  }
078
079  @After
080  public void tearDown() throws IOException {
081    procExecutor.stop();
082    procStore.stop(false);
083    fs.delete(logDir, true);
084  }
085
086  @Test
087  public void testYieldEachExecutionStep() throws Exception {
088    final int NUM_STATES = 3;
089
090    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
091    for (int i = 0; i < procs.length; ++i) {
092      procs[i] = new TestStateMachineProcedure(true, false);
093      procExecutor.submitProcedure(procs[i]);
094    }
095    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
096
097    for (int i = 0; i < procs.length; ++i) {
098      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
099
100      // verify execution
101      int index = 0;
102      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
103        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
104        assertEquals(false, info.isRollback());
105        assertEquals(execStep, info.getStep().ordinal());
106      }
107
108      // verify rollback
109      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
110        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
111        assertEquals(true, info.isRollback());
112        assertEquals(execStep, info.getStep().ordinal());
113      }
114    }
115
116    // check runnable queue stats
117    assertEquals(0, procRunnables.size());
118    assertEquals(0, procRunnables.addFrontCalls);
119    assertEquals(15, procRunnables.addBackCalls);
120    assertEquals(12, procRunnables.yieldCalls);
121    assertEquals(16, procRunnables.pollCalls);
122    assertEquals(3, procRunnables.completionCalls);
123  }
124
125  @Test
126  public void testYieldOnInterrupt() throws Exception {
127    final int NUM_STATES = 3;
128    int count = 0;
129
130    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
131    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
132
133    // test execute (we execute steps twice, one has the IE the other completes)
134    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
135    for (int i = 0; i < NUM_STATES; ++i) {
136      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
137      assertEquals(false, info.isRollback());
138      assertEquals(i, info.getStep().ordinal());
139
140      info = proc.getExecutionInfo().get(count++);
141      assertEquals(false, info.isRollback());
142      assertEquals(i, info.getStep().ordinal());
143    }
144
145    // test rollback (we execute steps twice, rollback counts both IE and completed)
146    for (int i = NUM_STATES - 1; i >= 0; --i) {
147      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
148      assertEquals(true, info.isRollback());
149      assertEquals(i, info.getStep().ordinal());
150    }
151
152    for (int i = NUM_STATES - 1; i >= 0; --i) {
153      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
154      assertEquals(true, info.isRollback());
155      assertEquals(0, info.getStep().ordinal());
156    }
157
158    // check runnable queue stats
159    assertEquals(0, procRunnables.size());
160    assertEquals(0, procRunnables.addFrontCalls);
161    assertEquals(11, procRunnables.addBackCalls);
162    assertEquals(10, procRunnables.yieldCalls);
163    assertEquals(12, procRunnables.pollCalls);
164    assertEquals(1, procRunnables.completionCalls);
165  }
166
167  @Test
168  public void testYieldException() {
169    TestYieldProcedure proc = new TestYieldProcedure();
170    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
171    assertEquals(6, proc.step);
172
173    // check runnable queue stats
174    assertEquals(0, procRunnables.size());
175    assertEquals(0, procRunnables.addFrontCalls);
176    assertEquals(6, procRunnables.addBackCalls);
177    assertEquals(5, procRunnables.yieldCalls);
178    assertEquals(7, procRunnables.pollCalls);
179    assertEquals(1, procRunnables.completionCalls);
180  }
181
182  private static class TestProcEnv {
183    public final AtomicLong timestamp = new AtomicLong(0);
184
185    public long nextTimestamp() {
186      return timestamp.incrementAndGet();
187    }
188  }
189
190  public static class TestStateMachineProcedure
191    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
192    enum State {
193      STATE_1,
194      STATE_2,
195      STATE_3
196    }
197
198    public static class ExecutionInfo {
199      private final boolean rollback;
200      private final long timestamp;
201      private final State step;
202
203      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
204        this.timestamp = timestamp;
205        this.step = step;
206        this.rollback = isRollback;
207      }
208
209      public State getStep() {
210        return step;
211      }
212
213      public long getTimestamp() {
214        return timestamp;
215      }
216
217      public boolean isRollback() {
218        return rollback;
219      }
220    }
221
222    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
223    private final AtomicBoolean aborted = new AtomicBoolean(false);
224    private final boolean throwInterruptOnceOnEachStep;
225    private final boolean abortOnFinalStep;
226
227    public TestStateMachineProcedure() {
228      this(false, false);
229    }
230
231    public TestStateMachineProcedure(boolean abortOnFinalStep,
232      boolean throwInterruptOnceOnEachStep) {
233      this.abortOnFinalStep = abortOnFinalStep;
234      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
235    }
236
237    public ArrayList<ExecutionInfo> getExecutionInfo() {
238      return executionInfo;
239    }
240
241    @Override
242    protected boolean isRollbackSupported(State state) {
243      return true;
244    }
245
246    @Override
247    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
248      throws InterruptedException {
249      final long ts = env.nextTimestamp();
250      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
251      executionInfo.add(new ExecutionInfo(ts, state, false));
252      Thread.sleep(150);
253
254      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
255        LOG.debug("THROW INTERRUPT");
256        throw new InterruptedException("test interrupt");
257      }
258
259      switch (state) {
260        case STATE_1:
261          setNextState(State.STATE_2);
262          break;
263        case STATE_2:
264          setNextState(State.STATE_3);
265          break;
266        case STATE_3:
267          if (abortOnFinalStep) {
268            setFailure("test", new IOException("Requested abort on final step"));
269          }
270          return Flow.NO_MORE_STATE;
271        default:
272          throw new UnsupportedOperationException();
273      }
274      return Flow.HAS_MORE_STATE;
275    }
276
277    @Override
278    protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException {
279      final long ts = env.nextTimestamp();
280      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
281      executionInfo.add(new ExecutionInfo(ts, state, true));
282      Thread.sleep(150);
283
284      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
285        LOG.debug("THROW INTERRUPT");
286        throw new InterruptedException("test interrupt");
287      }
288
289      switch (state) {
290        case STATE_1:
291          break;
292        case STATE_2:
293          break;
294        case STATE_3:
295          break;
296        default:
297          throw new UnsupportedOperationException();
298      }
299    }
300
301    @Override
302    protected State getState(final int stateId) {
303      return State.values()[stateId];
304    }
305
306    @Override
307    protected int getStateId(final State state) {
308      return state.ordinal();
309    }
310
311    @Override
312    protected State getInitialState() {
313      return State.STATE_1;
314    }
315
316    @Override
317    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
318      return true;
319    }
320
321    @Override
322    protected boolean abort(TestProcEnv env) {
323      aborted.set(true);
324      return true;
325    }
326  }
327
328  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
329    private int step = 0;
330
331    public TestYieldProcedure() {
332    }
333
334    @Override
335    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
336      LOG.info("execute step " + step);
337      if (step++ < 5) {
338        throw new ProcedureYieldException();
339      }
340      return null;
341    }
342
343    @Override
344    protected void rollback(TestProcEnv env) {
345    }
346
347    @Override
348    protected boolean abort(TestProcEnv env) {
349      return false;
350    }
351
352    @Override
353    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
354      return true;
355    }
356
357    @Override
358    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
359    }
360
361    @Override
362    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
363    }
364  }
365
366  private static class TestScheduler extends SimpleProcedureScheduler {
367    private int completionCalls;
368    private int addFrontCalls;
369    private int addBackCalls;
370    private int yieldCalls;
371    private int pollCalls;
372
373    public TestScheduler() {
374    }
375
376    @Override
377    public void addFront(final Procedure proc) {
378      addFrontCalls++;
379      super.addFront(proc);
380    }
381
382    @Override
383    public void addBack(final Procedure proc) {
384      addBackCalls++;
385      super.addBack(proc);
386    }
387
388    @Override
389    public void yield(final Procedure proc) {
390      yieldCalls++;
391      super.yield(proc);
392    }
393
394    @Override
395    public Procedure poll() {
396      pollCalls++;
397      return super.poll();
398    }
399
400    @Override
401    public Procedure poll(long timeout, TimeUnit unit) {
402      pollCalls++;
403      return super.poll(timeout, unit);
404    }
405
406    @Override
407    public void completionCleanup(Procedure proc) {
408      completionCalls++;
409    }
410  }
411}