001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 047 048@Category({ MasterTests.class, MediumTests.class }) 049public class TestProcedureRecovery { 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestProcedureRecovery.class); 053 054 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); 055 056 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 057 058 private static TestProcEnv procEnv; 059 private static ProcedureExecutor<TestProcEnv> procExecutor; 060 private static ProcedureStore procStore; 061 private static int procSleepInterval; 062 063 private HBaseCommonTestingUtility htu; 064 private FileSystem fs; 065 private Path testDir; 066 private Path logDir; 067 068 @Before 069 public void setUp() throws IOException { 070 htu = new HBaseCommonTestingUtility(); 071 testDir = htu.getDataTestDir(); 072 fs = testDir.getFileSystem(htu.getConfiguration()); 073 assertTrue(testDir.depth() > 1); 074 075 logDir = new Path(testDir, "proc-logs"); 076 procEnv = new TestProcEnv(); 077 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 078 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 079 procExecutor.testing = new ProcedureExecutor.Testing(); 080 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 081 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 082 procSleepInterval = 0; 083 } 084 085 @After 086 public void tearDown() throws IOException { 087 procExecutor.stop(); 088 procStore.stop(false); 089 fs.delete(logDir, true); 090 } 091 092 private void restart() throws Exception { 093 dumpLogDirState(); 094 ProcedureTestingUtility.restart(procExecutor); 095 dumpLogDirState(); 096 } 097 098 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 099 private int step = 0; 100 101 public TestSingleStepProcedure() { 102 } 103 104 @Override 105 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 106 env.waitOnLatch(); 107 LOG.debug("execute procedure " + this + " step=" + step); 108 step++; 109 setResult(Bytes.toBytes(step)); 110 return null; 111 } 112 113 @Override 114 protected void rollback(TestProcEnv env) { 115 } 116 117 @Override 118 protected boolean abort(TestProcEnv env) { 119 return true; 120 } 121 } 122 123 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { 124 private AtomicBoolean abort = new AtomicBoolean(false); 125 private int step = 0; 126 127 @Override 128 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 129 env.waitOnLatch(); 130 LOG.debug("execute procedure " + this + " step=" + step); 131 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 132 ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); 133 step++; 134 Threads.sleepWithoutInterrupt(procSleepInterval); 135 if (isAborted()) { 136 setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException( 137 "got an abort at " + getClass().getName() + " step=" + step))); 138 return null; 139 } 140 return null; 141 } 142 143 @Override 144 protected void rollback(TestProcEnv env) { 145 LOG.debug("rollback procedure " + this + " step=" + step); 146 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 147 ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); 148 step++; 149 } 150 151 @Override 152 protected boolean abort(TestProcEnv env) { 153 abort.set(true); 154 return true; 155 } 156 157 private boolean isAborted() { 158 boolean aborted = abort.get(); 159 BaseTestStepProcedure proc = this; 160 while (proc.hasParent() && !aborted) { 161 proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId()); 162 aborted = proc.isAborted(); 163 } 164 return aborted; 165 } 166 } 167 168 public static class TestMultiStepProcedure extends BaseTestStepProcedure { 169 public TestMultiStepProcedure() { 170 } 171 172 @Override 173 public Procedure[] execute(TestProcEnv env) throws InterruptedException { 174 super.execute(env); 175 return isFailed() ? null : new Procedure[] { new Step1Procedure() }; 176 } 177 178 public static class Step1Procedure extends BaseTestStepProcedure { 179 public Step1Procedure() { 180 } 181 182 @Override 183 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 184 super.execute(env); 185 return isFailed() ? null : new Procedure[] { new Step2Procedure() }; 186 } 187 } 188 189 public static class Step2Procedure extends BaseTestStepProcedure { 190 public Step2Procedure() { 191 } 192 } 193 } 194 195 @Test 196 public void testNoopLoad() throws Exception { 197 restart(); 198 } 199 200 @Test 201 public void testSingleStepProcRecovery() throws Exception { 202 Procedure proc = new TestSingleStepProcedure(); 203 procExecutor.testing.killBeforeStoreUpdate = true; 204 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 205 assertFalse(procExecutor.isRunning()); 206 procExecutor.testing.killBeforeStoreUpdate = false; 207 208 // Restart and verify that the procedures restart 209 long restartTs = EnvironmentEdgeManager.currentTime(); 210 restart(); 211 waitProcedure(procId); 212 Procedure<?> result = procExecutor.getResult(procId); 213 assertTrue(result.getLastUpdate() > restartTs); 214 ProcedureTestingUtility.assertProcNotFailed(result); 215 assertEquals(1, Bytes.toInt(result.getResult())); 216 long resultTs = result.getLastUpdate(); 217 218 // Verify that after another restart the result is still there 219 restart(); 220 result = procExecutor.getResult(procId); 221 ProcedureTestingUtility.assertProcNotFailed(result); 222 assertEquals(resultTs, result.getLastUpdate()); 223 assertEquals(1, Bytes.toInt(result.getResult())); 224 } 225 226 @Test 227 public void testMultiStepProcRecovery() throws Exception { 228 // Step 0 - kill 229 Procedure proc = new TestMultiStepProcedure(); 230 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 231 assertFalse(procExecutor.isRunning()); 232 233 // Step 0 exec && Step 1 - kill 234 restart(); 235 waitProcedure(procId); 236 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 237 assertFalse(procExecutor.isRunning()); 238 239 // Step 1 exec && step 2 - kill 240 restart(); 241 waitProcedure(procId); 242 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 243 assertFalse(procExecutor.isRunning()); 244 245 // Step 2 exec 246 restart(); 247 waitProcedure(procId); 248 assertTrue(procExecutor.isRunning()); 249 250 // The procedure is completed 251 Procedure<?> result = procExecutor.getResult(procId); 252 ProcedureTestingUtility.assertProcNotFailed(result); 253 } 254 255 @Test 256 public void testMultiStepRollbackRecovery() throws Exception { 257 // Step 0 - kill 258 Procedure proc = new TestMultiStepProcedure(); 259 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 260 assertFalse(procExecutor.isRunning()); 261 262 // Step 0 exec && Step 1 - kill 263 restart(); 264 waitProcedure(procId); 265 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 266 assertFalse(procExecutor.isRunning()); 267 268 // Step 1 exec && step 2 - kill 269 restart(); 270 waitProcedure(procId); 271 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 272 assertFalse(procExecutor.isRunning()); 273 274 // Step 2 exec - rollback - kill 275 procSleepInterval = 2500; 276 restart(); 277 assertTrue(procExecutor.abort(procId)); 278 waitProcedure(procId); 279 assertFalse(procExecutor.isRunning()); 280 281 // rollback - kill 282 restart(); 283 waitProcedure(procId); 284 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 285 assertFalse(procExecutor.isRunning()); 286 287 // rollback - complete 288 restart(); 289 waitProcedure(procId); 290 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 291 assertFalse(procExecutor.isRunning()); 292 293 // Restart the executor and get the result 294 restart(); 295 waitProcedure(procId); 296 297 // The procedure is completed 298 Procedure<?> result = procExecutor.getResult(procId); 299 ProcedureTestingUtility.assertIsAbortException(result); 300 } 301 302 public static class TestStateMachineProcedure 303 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 304 enum State { 305 STATE_1, 306 STATE_2, 307 STATE_3, 308 DONE 309 } 310 311 public TestStateMachineProcedure() { 312 } 313 314 public TestStateMachineProcedure(final boolean testSubmitChildProc) { 315 this.submitChildProc = testSubmitChildProc; 316 } 317 318 private AtomicBoolean aborted = new AtomicBoolean(false); 319 private int iResult = 0; 320 private boolean submitChildProc = false; 321 322 @Override 323 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { 324 switch (state) { 325 case STATE_1: 326 LOG.info("execute step 1 " + this); 327 setNextState(State.STATE_2); 328 iResult += 3; 329 break; 330 case STATE_2: 331 LOG.info("execute step 2 " + this); 332 if (submitChildProc) { 333 addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); 334 setNextState(State.DONE); 335 } else { 336 setNextState(State.STATE_3); 337 } 338 iResult += 5; 339 break; 340 case STATE_3: 341 LOG.info("execute step 3 " + this); 342 Threads.sleepWithoutInterrupt(procSleepInterval); 343 if (aborted.get()) { 344 LOG.info("aborted step 3 " + this); 345 setAbortFailure("test", "aborted"); 346 break; 347 } 348 setNextState(State.DONE); 349 iResult += 7; 350 break; 351 case DONE: 352 if (submitChildProc) { 353 addChildProcedure(new TestStateMachineProcedure()); 354 } 355 iResult += 11; 356 setResult(Bytes.toBytes(iResult)); 357 return Flow.NO_MORE_STATE; 358 default: 359 throw new UnsupportedOperationException(); 360 } 361 return Flow.HAS_MORE_STATE; 362 } 363 364 @Override 365 protected boolean isRollbackSupported(State state) { 366 return true; 367 } 368 369 @Override 370 protected void rollbackState(TestProcEnv env, final State state) { 371 switch (state) { 372 case STATE_1: 373 LOG.info("rollback step 1 " + this); 374 break; 375 case STATE_2: 376 LOG.info("rollback step 2 " + this); 377 break; 378 case STATE_3: 379 LOG.info("rollback step 3 " + this); 380 break; 381 default: 382 throw new UnsupportedOperationException(); 383 } 384 } 385 386 @Override 387 protected State getState(final int stateId) { 388 return State.values()[stateId]; 389 } 390 391 @Override 392 protected int getStateId(final State state) { 393 return state.ordinal(); 394 } 395 396 @Override 397 protected State getInitialState() { 398 return State.STATE_1; 399 } 400 401 @Override 402 protected boolean abort(TestProcEnv env) { 403 aborted.set(true); 404 return true; 405 } 406 407 @Override 408 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 409 super.serializeStateData(serializer); 410 Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); 411 serializer.serialize(builder.build()); 412 } 413 414 @Override 415 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 416 super.deserializeStateData(serializer); 417 Int32Value value = serializer.deserialize(Int32Value.class); 418 iResult = value.getValue(); 419 } 420 } 421 422 @Test 423 public void testStateMachineMultipleLevel() throws Exception { 424 long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); 425 // Wait the completion 426 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 427 Procedure<?> result = procExecutor.getResult(procId); 428 ProcedureTestingUtility.assertProcNotFailed(result); 429 assertEquals(19, Bytes.toInt(result.getResult())); 430 assertEquals(4, procExecutor.getLastProcId()); 431 } 432 433 @Test 434 public void testStateMachineRecovery() throws Exception { 435 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 436 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); 437 438 // Step 1 - kill 439 Procedure proc = new TestStateMachineProcedure(); 440 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 441 assertFalse(procExecutor.isRunning()); 442 443 // Step 1 exec && Step 2 - kill 444 restart(); 445 waitProcedure(procId); 446 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 447 assertFalse(procExecutor.isRunning()); 448 449 // Step 2 exec && step 3 - kill 450 restart(); 451 waitProcedure(procId); 452 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 453 assertFalse(procExecutor.isRunning()); 454 455 // Step 3 exec 456 restart(); 457 waitProcedure(procId); 458 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 459 assertFalse(procExecutor.isRunning()); 460 461 restart(); 462 waitProcedure(procId); 463 assertTrue(procExecutor.isRunning()); 464 465 // The procedure is completed 466 Procedure<?> result = procExecutor.getResult(procId); 467 ProcedureTestingUtility.assertProcNotFailed(result); 468 assertEquals(26, Bytes.toInt(result.getResult())); 469 } 470 471 @Test 472 public void testStateMachineRollbackRecovery() throws Exception { 473 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 474 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); 475 476 // Step 1 - kill 477 Procedure proc = new TestStateMachineProcedure(); 478 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 479 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 480 assertFalse(procExecutor.isRunning()); 481 482 // Step 1 exec && Step 2 - kill 483 restart(); 484 waitProcedure(procId); 485 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 486 assertFalse(procExecutor.isRunning()); 487 488 // Step 2 exec && step 3 - kill 489 restart(); 490 waitProcedure(procId); 491 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 492 assertFalse(procExecutor.isRunning()); 493 494 // Step 3 exec - rollback step 3 - kill 495 procSleepInterval = 2500; 496 restart(); 497 assertTrue(procExecutor.abort(procId)); 498 waitProcedure(procId); 499 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 500 assertFalse(procExecutor.isRunning()); 501 502 // Rollback step 3 - rollback step 2 - kill 503 restart(); 504 waitProcedure(procId); 505 assertFalse(procExecutor.isRunning()); 506 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 507 508 // Rollback step 2 - step 1 - kill 509 restart(); 510 waitProcedure(procId); 511 assertFalse(procExecutor.isRunning()); 512 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 513 514 // Rollback step 1 - complete 515 restart(); 516 waitProcedure(procId); 517 assertTrue(procExecutor.isRunning()); 518 519 // The procedure is completed 520 Procedure<?> result = procExecutor.getResult(procId); 521 ProcedureTestingUtility.assertIsAbortException(result); 522 } 523 524 private void waitProcedure(final long procId) { 525 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 526 dumpLogDirState(); 527 } 528 529 private void dumpLogDirState() { 530 try { 531 FileStatus[] files = fs.listStatus(logDir); 532 if (files != null && files.length > 0) { 533 for (FileStatus file : files) { 534 assertTrue(file.toString(), file.isFile()); 535 LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); 536 } 537 } else { 538 LOG.debug("no files under: " + logDir); 539 } 540 } catch (IOException e) { 541 LOG.warn("Unable to dump " + logDir, e); 542 } 543 } 544 545 private static class TestProcEnv { 546 private CountDownLatch latch = null; 547 548 /** 549 * set/unset a latch. every procedure execute() step will wait on the latch if any. 550 */ 551 public void setWaitLatch(CountDownLatch latch) { 552 this.latch = latch; 553 } 554 555 public void waitOnLatch() throws InterruptedException { 556 if (latch != null) { 557 latch.await(); 558 } 559 } 560 } 561}