001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
047
048@Category({ MasterTests.class, MediumTests.class })
049public class TestProcedureRecovery {
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestProcedureRecovery.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class);
055
056  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
057
058  private static TestProcEnv procEnv;
059  private static ProcedureExecutor<TestProcEnv> procExecutor;
060  private static ProcedureStore procStore;
061  private static int procSleepInterval;
062
063  private HBaseCommonTestingUtility htu;
064  private FileSystem fs;
065  private Path testDir;
066  private Path logDir;
067
068  @Before
069  public void setUp() throws IOException {
070    htu = new HBaseCommonTestingUtility();
071    testDir = htu.getDataTestDir();
072    fs = testDir.getFileSystem(htu.getConfiguration());
073    assertTrue(testDir.depth() > 1);
074
075    logDir = new Path(testDir, "proc-logs");
076    procEnv = new TestProcEnv();
077    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
078    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
079    procExecutor.testing = new ProcedureExecutor.Testing();
080    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
081    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
082    procSleepInterval = 0;
083  }
084
085  @After
086  public void tearDown() throws IOException {
087    procExecutor.stop();
088    procStore.stop(false);
089    fs.delete(logDir, true);
090  }
091
092  private void restart() throws Exception {
093    dumpLogDirState();
094    ProcedureTestingUtility.restart(procExecutor);
095    dumpLogDirState();
096  }
097
098  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
099    private int step = 0;
100
101    public TestSingleStepProcedure() {
102    }
103
104    @Override
105    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
106      env.waitOnLatch();
107      LOG.debug("execute procedure " + this + " step=" + step);
108      step++;
109      setResult(Bytes.toBytes(step));
110      return null;
111    }
112
113    @Override
114    protected void rollback(TestProcEnv env) {
115    }
116
117    @Override
118    protected boolean abort(TestProcEnv env) {
119      return true;
120    }
121  }
122
123  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
124    private AtomicBoolean abort = new AtomicBoolean(false);
125    private int step = 0;
126
127    @Override
128    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
129      env.waitOnLatch();
130      LOG.debug("execute procedure " + this + " step=" + step);
131      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
132      ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor);
133      step++;
134      Threads.sleepWithoutInterrupt(procSleepInterval);
135      if (isAborted()) {
136        setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException(
137          "got an abort at " + getClass().getName() + " step=" + step)));
138        return null;
139      }
140      return null;
141    }
142
143    @Override
144    protected void rollback(TestProcEnv env) {
145      LOG.debug("rollback procedure " + this + " step=" + step);
146      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
147      ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor);
148      step++;
149    }
150
151    @Override
152    protected boolean abort(TestProcEnv env) {
153      abort.set(true);
154      return true;
155    }
156
157    private boolean isAborted() {
158      boolean aborted = abort.get();
159      BaseTestStepProcedure proc = this;
160      while (proc.hasParent() && !aborted) {
161        proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId());
162        aborted = proc.isAborted();
163      }
164      return aborted;
165    }
166  }
167
168  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
169    public TestMultiStepProcedure() {
170    }
171
172    @Override
173    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
174      super.execute(env);
175      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
176    }
177
178    public static class Step1Procedure extends BaseTestStepProcedure {
179      public Step1Procedure() {
180      }
181
182      @Override
183      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
184        super.execute(env);
185        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
186      }
187    }
188
189    public static class Step2Procedure extends BaseTestStepProcedure {
190      public Step2Procedure() {
191      }
192    }
193  }
194
195  @Test
196  public void testNoopLoad() throws Exception {
197    restart();
198  }
199
200  @Test
201  public void testSingleStepProcRecovery() throws Exception {
202    Procedure proc = new TestSingleStepProcedure();
203    procExecutor.testing.killBeforeStoreUpdate = true;
204    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
205    assertFalse(procExecutor.isRunning());
206    procExecutor.testing.killBeforeStoreUpdate = false;
207
208    // Restart and verify that the procedures restart
209    long restartTs = EnvironmentEdgeManager.currentTime();
210    restart();
211    waitProcedure(procId);
212    Procedure<?> result = procExecutor.getResult(procId);
213    assertTrue(result.getLastUpdate() > restartTs);
214    ProcedureTestingUtility.assertProcNotFailed(result);
215    assertEquals(1, Bytes.toInt(result.getResult()));
216    long resultTs = result.getLastUpdate();
217
218    // Verify that after another restart the result is still there
219    restart();
220    result = procExecutor.getResult(procId);
221    ProcedureTestingUtility.assertProcNotFailed(result);
222    assertEquals(resultTs, result.getLastUpdate());
223    assertEquals(1, Bytes.toInt(result.getResult()));
224  }
225
226  @Test
227  public void testMultiStepProcRecovery() throws Exception {
228    // Step 0 - kill
229    Procedure proc = new TestMultiStepProcedure();
230    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
231    assertFalse(procExecutor.isRunning());
232
233    // Step 0 exec && Step 1 - kill
234    restart();
235    waitProcedure(procId);
236    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
237    assertFalse(procExecutor.isRunning());
238
239    // Step 1 exec && step 2 - kill
240    restart();
241    waitProcedure(procId);
242    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
243    assertFalse(procExecutor.isRunning());
244
245    // Step 2 exec
246    restart();
247    waitProcedure(procId);
248    assertTrue(procExecutor.isRunning());
249
250    // The procedure is completed
251    Procedure<?> result = procExecutor.getResult(procId);
252    ProcedureTestingUtility.assertProcNotFailed(result);
253  }
254
255  @Test
256  public void testMultiStepRollbackRecovery() throws Exception {
257    // Step 0 - kill
258    Procedure proc = new TestMultiStepProcedure();
259    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
260    assertFalse(procExecutor.isRunning());
261
262    // Step 0 exec && Step 1 - kill
263    restart();
264    waitProcedure(procId);
265    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
266    assertFalse(procExecutor.isRunning());
267
268    // Step 1 exec && step 2 - kill
269    restart();
270    waitProcedure(procId);
271    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
272    assertFalse(procExecutor.isRunning());
273
274    // Step 2 exec - rollback - kill
275    procSleepInterval = 2500;
276    restart();
277    assertTrue(procExecutor.abort(procId));
278    waitProcedure(procId);
279    assertFalse(procExecutor.isRunning());
280
281    // rollback - kill
282    restart();
283    waitProcedure(procId);
284    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
285    assertFalse(procExecutor.isRunning());
286
287    // rollback - complete
288    restart();
289    waitProcedure(procId);
290    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
291    assertFalse(procExecutor.isRunning());
292
293    // Restart the executor and get the result
294    restart();
295    waitProcedure(procId);
296
297    // The procedure is completed
298    Procedure<?> result = procExecutor.getResult(procId);
299    ProcedureTestingUtility.assertIsAbortException(result);
300  }
301
302  public static class TestStateMachineProcedure
303    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
304    enum State {
305      STATE_1,
306      STATE_2,
307      STATE_3,
308      DONE
309    }
310
311    public TestStateMachineProcedure() {
312    }
313
314    public TestStateMachineProcedure(final boolean testSubmitChildProc) {
315      this.submitChildProc = testSubmitChildProc;
316    }
317
318    private AtomicBoolean aborted = new AtomicBoolean(false);
319    private int iResult = 0;
320    private boolean submitChildProc = false;
321
322    @Override
323    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
324      switch (state) {
325        case STATE_1:
326          LOG.info("execute step 1 " + this);
327          setNextState(State.STATE_2);
328          iResult += 3;
329          break;
330        case STATE_2:
331          LOG.info("execute step 2 " + this);
332          if (submitChildProc) {
333            addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
334            setNextState(State.DONE);
335          } else {
336            setNextState(State.STATE_3);
337          }
338          iResult += 5;
339          break;
340        case STATE_3:
341          LOG.info("execute step 3 " + this);
342          Threads.sleepWithoutInterrupt(procSleepInterval);
343          if (aborted.get()) {
344            LOG.info("aborted step 3 " + this);
345            setAbortFailure("test", "aborted");
346            break;
347          }
348          setNextState(State.DONE);
349          iResult += 7;
350          break;
351        case DONE:
352          if (submitChildProc) {
353            addChildProcedure(new TestStateMachineProcedure());
354          }
355          iResult += 11;
356          setResult(Bytes.toBytes(iResult));
357          return Flow.NO_MORE_STATE;
358        default:
359          throw new UnsupportedOperationException();
360      }
361      return Flow.HAS_MORE_STATE;
362    }
363
364    @Override
365    protected boolean isRollbackSupported(State state) {
366      return true;
367    }
368
369    @Override
370    protected void rollbackState(TestProcEnv env, final State state) {
371      switch (state) {
372        case STATE_1:
373          LOG.info("rollback step 1 " + this);
374          break;
375        case STATE_2:
376          LOG.info("rollback step 2 " + this);
377          break;
378        case STATE_3:
379          LOG.info("rollback step 3 " + this);
380          break;
381        default:
382          throw new UnsupportedOperationException();
383      }
384    }
385
386    @Override
387    protected State getState(final int stateId) {
388      return State.values()[stateId];
389    }
390
391    @Override
392    protected int getStateId(final State state) {
393      return state.ordinal();
394    }
395
396    @Override
397    protected State getInitialState() {
398      return State.STATE_1;
399    }
400
401    @Override
402    protected boolean abort(TestProcEnv env) {
403      aborted.set(true);
404      return true;
405    }
406
407    @Override
408    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
409      super.serializeStateData(serializer);
410      Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
411      serializer.serialize(builder.build());
412    }
413
414    @Override
415    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
416      super.deserializeStateData(serializer);
417      Int32Value value = serializer.deserialize(Int32Value.class);
418      iResult = value.getValue();
419    }
420  }
421
422  @Test
423  public void testStateMachineMultipleLevel() throws Exception {
424    long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
425    // Wait the completion
426    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
427    Procedure<?> result = procExecutor.getResult(procId);
428    ProcedureTestingUtility.assertProcNotFailed(result);
429    assertEquals(19, Bytes.toInt(result.getResult()));
430    assertEquals(4, procExecutor.getLastProcId());
431  }
432
433  @Test
434  public void testStateMachineRecovery() throws Exception {
435    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
436    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true);
437
438    // Step 1 - kill
439    Procedure proc = new TestStateMachineProcedure();
440    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
441    assertFalse(procExecutor.isRunning());
442
443    // Step 1 exec && Step 2 - kill
444    restart();
445    waitProcedure(procId);
446    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
447    assertFalse(procExecutor.isRunning());
448
449    // Step 2 exec && step 3 - kill
450    restart();
451    waitProcedure(procId);
452    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
453    assertFalse(procExecutor.isRunning());
454
455    // Step 3 exec
456    restart();
457    waitProcedure(procId);
458    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
459    assertFalse(procExecutor.isRunning());
460
461    restart();
462    waitProcedure(procId);
463    assertTrue(procExecutor.isRunning());
464
465    // The procedure is completed
466    Procedure<?> result = procExecutor.getResult(procId);
467    ProcedureTestingUtility.assertProcNotFailed(result);
468    assertEquals(26, Bytes.toInt(result.getResult()));
469  }
470
471  @Test
472  public void testStateMachineRollbackRecovery() throws Exception {
473    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
474    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true);
475
476    // Step 1 - kill
477    Procedure proc = new TestStateMachineProcedure();
478    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
479    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
480    assertFalse(procExecutor.isRunning());
481
482    // Step 1 exec && Step 2 - kill
483    restart();
484    waitProcedure(procId);
485    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
486    assertFalse(procExecutor.isRunning());
487
488    // Step 2 exec && step 3 - kill
489    restart();
490    waitProcedure(procId);
491    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
492    assertFalse(procExecutor.isRunning());
493
494    // Step 3 exec - rollback step 3 - kill
495    procSleepInterval = 2500;
496    restart();
497    assertTrue(procExecutor.abort(procId));
498    waitProcedure(procId);
499    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
500    assertFalse(procExecutor.isRunning());
501
502    // Rollback step 3 - rollback step 2 - kill
503    restart();
504    waitProcedure(procId);
505    assertFalse(procExecutor.isRunning());
506    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
507
508    // Rollback step 2 - step 1 - kill
509    restart();
510    waitProcedure(procId);
511    assertFalse(procExecutor.isRunning());
512    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
513
514    // Rollback step 1 - complete
515    restart();
516    waitProcedure(procId);
517    assertTrue(procExecutor.isRunning());
518
519    // The procedure is completed
520    Procedure<?> result = procExecutor.getResult(procId);
521    ProcedureTestingUtility.assertIsAbortException(result);
522  }
523
524  private void waitProcedure(final long procId) {
525    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
526    dumpLogDirState();
527  }
528
529  private void dumpLogDirState() {
530    try {
531      FileStatus[] files = fs.listStatus(logDir);
532      if (files != null && files.length > 0) {
533        for (FileStatus file : files) {
534          assertTrue(file.toString(), file.isFile());
535          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
536        }
537      } else {
538        LOG.debug("no files under: " + logDir);
539      }
540    } catch (IOException e) {
541      LOG.warn("Unable to dump " + logDir, e);
542    }
543  }
544
545  private static class TestProcEnv {
546    private CountDownLatch latch = null;
547
548    /**
549     * set/unset a latch. every procedure execute() step will wait on the latch if any.
550     */
551    public void setWaitLatch(CountDownLatch latch) {
552      this.latch = latch;
553    }
554
555    public void waitOnLatch() throws InterruptedException {
556      if (latch != null) {
557        latch.await();
558      }
559    }
560  }
561}