001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 032import org.apache.hadoop.hbase.procedure2.TestProcedureRecovery.TestStateMachineProcedure.State; 033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.junit.After; 040import org.junit.Before; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 048 049@Category({ MasterTests.class, MediumTests.class }) 050public class TestProcedureRecovery { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestProcedureRecovery.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); 056 057 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 058 059 private static TestProcEnv procEnv; 060 private static ProcedureExecutor<TestProcEnv> procExecutor; 061 private static ProcedureStore procStore; 062 private static int procSleepInterval; 063 064 private HBaseCommonTestingUtil htu; 065 private FileSystem fs; 066 private Path testDir; 067 private Path logDir; 068 069 @Before 070 public void setUp() throws IOException { 071 htu = new HBaseCommonTestingUtil(); 072 testDir = htu.getDataTestDir(); 073 fs = testDir.getFileSystem(htu.getConfiguration()); 074 assertTrue(testDir.depth() > 1); 075 076 logDir = new Path(testDir, "proc-logs"); 077 procEnv = new TestProcEnv(); 078 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 079 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 080 procExecutor.testing = new ProcedureExecutor.Testing(); 081 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 082 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 083 procSleepInterval = 0; 084 } 085 086 @After 087 public void tearDown() throws IOException { 088 procExecutor.stop(); 089 procStore.stop(false); 090 fs.delete(logDir, true); 091 } 092 093 private void restart() throws Exception { 094 dumpLogDirState(); 095 ProcedureTestingUtility.restart(procExecutor); 096 dumpLogDirState(); 097 } 098 099 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 100 private int step = 0; 101 102 public TestSingleStepProcedure() { 103 } 104 105 @Override 106 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 107 env.waitOnLatch(); 108 LOG.debug("execute procedure " + this + " step=" + step); 109 step++; 110 setResult(Bytes.toBytes(step)); 111 return null; 112 } 113 114 @Override 115 protected void rollback(TestProcEnv env) { 116 } 117 118 @Override 119 protected boolean abort(TestProcEnv env) { 120 return true; 121 } 122 } 123 124 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { 125 private AtomicBoolean abort = new AtomicBoolean(false); 126 private int step = 0; 127 128 @Override 129 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 130 env.waitOnLatch(); 131 LOG.debug("execute procedure " + this + " step=" + step); 132 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 133 ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); 134 step++; 135 Threads.sleepWithoutInterrupt(procSleepInterval); 136 if (isAborted()) { 137 setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException( 138 "got an abort at " + getClass().getName() + " step=" + step))); 139 return null; 140 } 141 return null; 142 } 143 144 @Override 145 protected void rollback(TestProcEnv env) { 146 LOG.debug("rollback procedure " + this + " step=" + step); 147 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 148 ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); 149 step++; 150 } 151 152 @Override 153 protected boolean abort(TestProcEnv env) { 154 abort.set(true); 155 return true; 156 } 157 158 private boolean isAborted() { 159 boolean aborted = abort.get(); 160 BaseTestStepProcedure proc = this; 161 while (proc.hasParent() && !aborted) { 162 proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId()); 163 aborted = proc.isAborted(); 164 } 165 return aborted; 166 } 167 } 168 169 public static class TestMultiStepProcedure extends BaseTestStepProcedure { 170 public TestMultiStepProcedure() { 171 } 172 173 @Override 174 public Procedure[] execute(TestProcEnv env) throws InterruptedException { 175 super.execute(env); 176 return isFailed() ? null : new Procedure[] { new Step1Procedure() }; 177 } 178 179 public static class Step1Procedure extends BaseTestStepProcedure { 180 public Step1Procedure() { 181 } 182 183 @Override 184 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 185 super.execute(env); 186 return isFailed() ? null : new Procedure[] { new Step2Procedure() }; 187 } 188 } 189 190 public static class Step2Procedure extends BaseTestStepProcedure { 191 public Step2Procedure() { 192 } 193 } 194 } 195 196 @Test 197 public void testNoopLoad() throws Exception { 198 restart(); 199 } 200 201 @Test 202 public void testSingleStepProcRecovery() throws Exception { 203 Procedure proc = new TestSingleStepProcedure(); 204 procExecutor.testing.killBeforeStoreUpdate = true; 205 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 206 assertFalse(procExecutor.isRunning()); 207 procExecutor.testing.killBeforeStoreUpdate = false; 208 209 // Restart and verify that the procedures restart 210 long restartTs = EnvironmentEdgeManager.currentTime(); 211 restart(); 212 waitProcedure(procId); 213 Procedure<?> result = procExecutor.getResult(procId); 214 assertTrue(result.getLastUpdate() > restartTs); 215 ProcedureTestingUtility.assertProcNotFailed(result); 216 assertEquals(1, Bytes.toInt(result.getResult())); 217 long resultTs = result.getLastUpdate(); 218 219 // Verify that after another restart the result is still there 220 restart(); 221 result = procExecutor.getResult(procId); 222 ProcedureTestingUtility.assertProcNotFailed(result); 223 assertEquals(resultTs, result.getLastUpdate()); 224 assertEquals(1, Bytes.toInt(result.getResult())); 225 } 226 227 @Test 228 public void testMultiStepProcRecovery() throws Exception { 229 // Step 0 - kill 230 Procedure proc = new TestMultiStepProcedure(); 231 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 232 assertFalse(procExecutor.isRunning()); 233 234 // Step 0 exec && Step 1 - kill 235 restart(); 236 waitProcedure(procId); 237 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 238 assertFalse(procExecutor.isRunning()); 239 240 // Step 1 exec && step 2 - kill 241 restart(); 242 waitProcedure(procId); 243 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 244 assertFalse(procExecutor.isRunning()); 245 246 // Step 2 exec 247 restart(); 248 waitProcedure(procId); 249 assertTrue(procExecutor.isRunning()); 250 251 // The procedure is completed 252 Procedure<?> result = procExecutor.getResult(procId); 253 ProcedureTestingUtility.assertProcNotFailed(result); 254 } 255 256 @Test 257 public void testMultiStepRollbackRecovery() throws Exception { 258 // Step 0 - kill 259 Procedure proc = new TestMultiStepProcedure(); 260 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 261 assertFalse(procExecutor.isRunning()); 262 263 // Step 0 exec && Step 1 - kill 264 restart(); 265 waitProcedure(procId); 266 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 267 assertFalse(procExecutor.isRunning()); 268 269 // Step 1 exec && step 2 - kill 270 restart(); 271 waitProcedure(procId); 272 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 273 assertFalse(procExecutor.isRunning()); 274 275 // Step 2 exec - rollback - kill 276 procSleepInterval = 2500; 277 restart(); 278 assertTrue(procExecutor.abort(procId)); 279 waitProcedure(procId); 280 assertFalse(procExecutor.isRunning()); 281 282 // rollback - kill 283 restart(); 284 waitProcedure(procId); 285 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 286 assertFalse(procExecutor.isRunning()); 287 288 // rollback - complete 289 restart(); 290 waitProcedure(procId); 291 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 292 assertFalse(procExecutor.isRunning()); 293 294 // Restart the executor and get the result 295 restart(); 296 waitProcedure(procId); 297 298 // The procedure is completed 299 Procedure<?> result = procExecutor.getResult(procId); 300 ProcedureTestingUtility.assertIsAbortException(result); 301 } 302 303 public static class TestStateMachineProcedure 304 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 305 enum State { 306 STATE_1, 307 STATE_2, 308 STATE_3, 309 DONE 310 } 311 312 public TestStateMachineProcedure() { 313 } 314 315 public TestStateMachineProcedure(final boolean testSubmitChildProc) { 316 this.submitChildProc = testSubmitChildProc; 317 } 318 319 private AtomicBoolean aborted = new AtomicBoolean(false); 320 private int iResult = 0; 321 private boolean submitChildProc = false; 322 323 @Override 324 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { 325 switch (state) { 326 case STATE_1: 327 LOG.info("execute step 1 " + this); 328 setNextState(State.STATE_2); 329 iResult += 3; 330 break; 331 case STATE_2: 332 LOG.info("execute step 2 " + this); 333 if (submitChildProc) { 334 addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); 335 setNextState(State.DONE); 336 } else { 337 setNextState(State.STATE_3); 338 } 339 iResult += 5; 340 break; 341 case STATE_3: 342 LOG.info("execute step 3 " + this); 343 Threads.sleepWithoutInterrupt(procSleepInterval); 344 if (aborted.get()) { 345 LOG.info("aborted step 3 " + this); 346 setAbortFailure("test", "aborted"); 347 break; 348 } 349 setNextState(State.DONE); 350 iResult += 7; 351 break; 352 case DONE: 353 if (submitChildProc) { 354 addChildProcedure(new TestStateMachineProcedure()); 355 } 356 iResult += 11; 357 setResult(Bytes.toBytes(iResult)); 358 return Flow.NO_MORE_STATE; 359 default: 360 throw new UnsupportedOperationException(); 361 } 362 return Flow.HAS_MORE_STATE; 363 } 364 365 @Override 366 protected boolean isRollbackSupported(State state) { 367 return true; 368 } 369 370 @Override 371 protected void rollbackState(TestProcEnv env, final State state) { 372 switch (state) { 373 case STATE_1: 374 LOG.info("rollback step 1 " + this); 375 break; 376 case STATE_2: 377 LOG.info("rollback step 2 " + this); 378 break; 379 case STATE_3: 380 LOG.info("rollback step 3 " + this); 381 break; 382 default: 383 throw new UnsupportedOperationException(); 384 } 385 } 386 387 @Override 388 protected State getState(final int stateId) { 389 return State.values()[stateId]; 390 } 391 392 @Override 393 protected int getStateId(final State state) { 394 return state.ordinal(); 395 } 396 397 @Override 398 protected State getInitialState() { 399 return State.STATE_1; 400 } 401 402 @Override 403 protected boolean abort(TestProcEnv env) { 404 aborted.set(true); 405 return true; 406 } 407 408 @Override 409 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 410 super.serializeStateData(serializer); 411 Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); 412 serializer.serialize(builder.build()); 413 } 414 415 @Override 416 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 417 super.deserializeStateData(serializer); 418 Int32Value value = serializer.deserialize(Int32Value.class); 419 iResult = value.getValue(); 420 } 421 } 422 423 @Test 424 public void testStateMachineMultipleLevel() throws Exception { 425 long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); 426 // Wait the completion 427 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 428 Procedure<?> result = procExecutor.getResult(procId); 429 ProcedureTestingUtility.assertProcNotFailed(result); 430 assertEquals(19, Bytes.toInt(result.getResult())); 431 assertEquals(4, procExecutor.getLastProcId()); 432 } 433 434 @Test 435 public void testStateMachineRecovery() throws Exception { 436 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 437 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); 438 439 // Step 1 - kill 440 Procedure proc = new TestStateMachineProcedure(); 441 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 442 assertFalse(procExecutor.isRunning()); 443 444 // Step 1 exec && Step 2 - kill 445 restart(); 446 waitProcedure(procId); 447 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 448 assertFalse(procExecutor.isRunning()); 449 450 // Step 2 exec && step 3 - kill 451 restart(); 452 waitProcedure(procId); 453 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 454 assertFalse(procExecutor.isRunning()); 455 456 // Step 3 exec 457 restart(); 458 waitProcedure(procId); 459 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 460 assertFalse(procExecutor.isRunning()); 461 462 restart(); 463 waitProcedure(procId); 464 assertTrue(procExecutor.isRunning()); 465 466 // The procedure is completed 467 Procedure<?> result = procExecutor.getResult(procId); 468 ProcedureTestingUtility.assertProcNotFailed(result); 469 assertEquals(26, Bytes.toInt(result.getResult())); 470 } 471 472 @Test 473 public void testStateMachineRollbackRecovery() throws Exception { 474 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 475 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); 476 477 // Step 1 - kill 478 Procedure proc = new TestStateMachineProcedure(); 479 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 480 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 481 assertFalse(procExecutor.isRunning()); 482 483 // Step 1 exec && Step 2 - kill 484 restart(); 485 waitProcedure(procId); 486 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 487 assertFalse(procExecutor.isRunning()); 488 489 // Step 2 exec && step 3 - kill 490 restart(); 491 waitProcedure(procId); 492 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 493 assertFalse(procExecutor.isRunning()); 494 495 // Step 3 exec - rollback step 3 - kill 496 procSleepInterval = 2500; 497 restart(); 498 assertTrue(procExecutor.abort(procId)); 499 waitProcedure(procId); 500 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 501 assertFalse(procExecutor.isRunning()); 502 503 // Rollback step 3 - rollback step 2 - kill 504 restart(); 505 waitProcedure(procId); 506 assertFalse(procExecutor.isRunning()); 507 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 508 509 // Rollback step 2 - step 1 - kill 510 restart(); 511 waitProcedure(procId); 512 assertFalse(procExecutor.isRunning()); 513 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 514 515 // Rollback step 1 - complete 516 restart(); 517 waitProcedure(procId); 518 assertTrue(procExecutor.isRunning()); 519 520 // The procedure is completed 521 Procedure<?> result = procExecutor.getResult(procId); 522 ProcedureTestingUtility.assertIsAbortException(result); 523 } 524 525 private void waitProcedure(final long procId) { 526 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 527 dumpLogDirState(); 528 } 529 530 private void dumpLogDirState() { 531 try { 532 FileStatus[] files = fs.listStatus(logDir); 533 if (files != null && files.length > 0) { 534 for (FileStatus file : files) { 535 assertTrue(file.toString(), file.isFile()); 536 LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); 537 } 538 } else { 539 LOG.debug("no files under: " + logDir); 540 } 541 } catch (IOException e) { 542 LOG.warn("Unable to dump " + logDir, e); 543 } 544 } 545 546 private static class TestProcEnv { 547 private CountDownLatch latch = null; 548 549 /** 550 * set/unset a latch. every procedure execute() step will wait on the latch if any. 551 */ 552 public void setWaitLatch(CountDownLatch latch) { 553 this.latch = latch; 554 } 555 556 public void waitOnLatch() throws InterruptedException { 557 if (latch != null) { 558 latch.await(); 559 } 560 } 561 } 562}