001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Objects; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 044 045@Category({ MasterTests.class, SmallTests.class }) 046public class TestProcedureExecution { 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestProcedureExecution.class); 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class); 052 053 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 054 private static final Procedure<?> NULL_PROC = null; 055 056 private ProcedureExecutor<Void> procExecutor; 057 private ProcedureStore procStore; 058 059 private HBaseCommonTestingUtil htu; 060 private FileSystem fs; 061 private Path testDir; 062 private Path logDir; 063 064 @Before 065 public void setUp() throws IOException { 066 htu = new HBaseCommonTestingUtil(); 067 testDir = htu.getDataTestDir(); 068 fs = testDir.getFileSystem(htu.getConfiguration()); 069 assertTrue(testDir.depth() > 1); 070 071 logDir = new Path(testDir, "proc-logs"); 072 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 073 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 074 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 075 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 076 } 077 078 @After 079 public void tearDown() throws IOException { 080 procExecutor.stop(); 081 procStore.stop(false); 082 fs.delete(logDir, true); 083 } 084 085 private static class TestProcedureException extends IOException { 086 087 private static final long serialVersionUID = 8798565784658913798L; 088 089 public TestProcedureException(String msg) { 090 super(msg); 091 } 092 } 093 094 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 095 private final Procedure<Void>[] subProcs; 096 private final List<String> state; 097 private final Exception failure; 098 private final String name; 099 100 public TestSequentialProcedure() { 101 throw new UnsupportedOperationException("recovery should not be triggered here"); 102 } 103 104 public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) { 105 this.state = state; 106 this.subProcs = subProcs; 107 this.name = name; 108 this.failure = null; 109 } 110 111 public TestSequentialProcedure(String name, List<String> state, Exception failure) { 112 this.state = state; 113 this.subProcs = null; 114 this.name = name; 115 this.failure = failure; 116 } 117 118 @Override 119 protected Procedure<Void>[] execute(Void env) { 120 state.add(name + "-execute"); 121 if (failure != null) { 122 setFailure(new RemoteProcedureException(name + "-failure", failure)); 123 return null; 124 } 125 return subProcs; 126 } 127 128 @Override 129 protected void rollback(Void env) { 130 state.add(name + "-rollback"); 131 } 132 133 @Override 134 protected boolean abort(Void env) { 135 state.add(name + "-abort"); 136 return true; 137 } 138 } 139 140 @Test 141 public void testBadSubprocList() { 142 List<String> state = new ArrayList<>(); 143 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 144 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC); 145 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 146 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 147 148 // subProc1 has a "null" subprocedure which is catched as InvalidArgument 149 // failed state with 2 execute and 2 rollback 150 LOG.info(Objects.toString(state)); 151 Procedure<?> result = procExecutor.getResult(rootId); 152 assertTrue(state.toString(), result.isFailed()); 153 ProcedureTestingUtility.assertIsIllegalArgumentException(result); 154 155 assertEquals(state.toString(), 4, state.size()); 156 assertEquals("rootProc-execute", state.get(0)); 157 assertEquals("subProc1-execute", state.get(1)); 158 assertEquals("subProc1-rollback", state.get(2)); 159 assertEquals("rootProc-rollback", state.get(3)); 160 } 161 162 @Test 163 public void testSingleSequentialProc() { 164 List<String> state = new ArrayList<>(); 165 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 166 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 167 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 168 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 169 170 // successful state, with 3 execute 171 LOG.info(Objects.toString(state)); 172 Procedure<?> result = procExecutor.getResult(rootId); 173 ProcedureTestingUtility.assertProcNotFailed(result); 174 assertEquals(state.toString(), 3, state.size()); 175 } 176 177 @Test 178 public void testSingleSequentialProcRollback() { 179 List<String> state = new ArrayList<>(); 180 Procedure<Void> subProc2 = 181 new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test")); 182 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 183 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 184 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 185 186 // the 3rd proc fail, rollback after 2 successful execution 187 LOG.info(Objects.toString(state)); 188 Procedure<?> result = procExecutor.getResult(rootId); 189 assertTrue(state.toString(), result.isFailed()); 190 LOG.info(result.getException().getMessage()); 191 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 192 assertTrue("expected TestProcedureException, got " + cause, 193 cause instanceof TestProcedureException); 194 195 assertEquals(state.toString(), 6, state.size()); 196 assertEquals("rootProc-execute", state.get(0)); 197 assertEquals("subProc1-execute", state.get(1)); 198 assertEquals("subProc2-execute", state.get(2)); 199 assertEquals("subProc2-rollback", state.get(3)); 200 assertEquals("subProc1-rollback", state.get(4)); 201 assertEquals("rootProc-rollback", state.get(5)); 202 } 203 204 public static class TestFaultyRollback extends SequentialProcedure<Void> { 205 private int retries = 0; 206 207 public TestFaultyRollback() { 208 } 209 210 @Override 211 protected Procedure<Void>[] execute(Void env) { 212 setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback")); 213 return null; 214 } 215 216 @Override 217 protected void rollback(Void env) throws IOException { 218 if (++retries < 3) { 219 LOG.info("inject rollback failure " + retries); 220 throw new IOException("injected failure number " + retries); 221 } 222 LOG.info("execute non faulty rollback step retries=" + retries); 223 } 224 225 @Override 226 protected boolean abort(Void env) { 227 return false; 228 } 229 } 230 231 @Test 232 public void testRollbackRetriableFailure() { 233 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback()); 234 235 Procedure<?> result = procExecutor.getResult(procId); 236 assertTrue("expected a failure", result.isFailed()); 237 LOG.info(result.getException().getMessage()); 238 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 239 assertTrue("expected TestProcedureException, got " + cause, 240 cause instanceof TestProcedureException); 241 } 242 243 public static class TestWaitingProcedure extends SequentialProcedure<Void> { 244 private final List<String> state; 245 private final boolean hasChild; 246 private final String name; 247 248 public TestWaitingProcedure() { 249 throw new UnsupportedOperationException("recovery should not be triggered here"); 250 } 251 252 public TestWaitingProcedure(String name, List<String> state, boolean hasChild) { 253 this.hasChild = hasChild; 254 this.state = state; 255 this.name = name; 256 } 257 258 @Override 259 protected Procedure<Void>[] execute(Void env) { 260 state.add(name + "-execute"); 261 setState(ProcedureState.WAITING_TIMEOUT); 262 return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null; 263 } 264 265 @Override 266 protected void rollback(Void env) { 267 state.add(name + "-rollback"); 268 } 269 270 @Override 271 protected boolean abort(Void env) { 272 state.add(name + "-abort"); 273 return true; 274 } 275 276 public static class TestWaitChild extends SequentialProcedure<Void> { 277 private final List<String> state; 278 private final String name; 279 280 public TestWaitChild() { 281 throw new UnsupportedOperationException("recovery should not be triggered here"); 282 } 283 284 public TestWaitChild(String name, List<String> state) { 285 this.name = name; 286 this.state = state; 287 } 288 289 @Override 290 protected Procedure<Void>[] execute(Void env) { 291 state.add(name + "-child-execute"); 292 return null; 293 } 294 295 @Override 296 protected void rollback(Void env) { 297 throw new UnsupportedOperationException("should not rollback a successful child procedure"); 298 } 299 300 @Override 301 protected boolean abort(Void env) { 302 state.add(name + "-child-abort"); 303 return true; 304 } 305 } 306 } 307 308 @Test 309 public void testAbortTimeout() { 310 final int PROC_TIMEOUT_MSEC = 2500; 311 List<String> state = new ArrayList<>(); 312 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false); 313 proc.setTimeout(PROC_TIMEOUT_MSEC); 314 long startTime = EnvironmentEdgeManager.currentTime(); 315 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 316 long execTime = EnvironmentEdgeManager.currentTime() - startTime; 317 LOG.info(Objects.toString(state)); 318 assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC); 319 Procedure<?> result = procExecutor.getResult(rootId); 320 assertTrue(state.toString(), result.isFailed()); 321 ProcedureTestingUtility.assertIsTimeoutException(result); 322 assertEquals(state.toString(), 2, state.size()); 323 assertEquals("wproc-execute", state.get(0)); 324 assertEquals("wproc-rollback", state.get(1)); 325 } 326 327 @Test 328 public void testAbortTimeoutWithChildren() { 329 List<String> state = new ArrayList<>(); 330 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true); 331 proc.setTimeout(2500); 332 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 333 LOG.info(Objects.toString(state)); 334 Procedure<?> result = procExecutor.getResult(rootId); 335 assertTrue(state.toString(), result.isFailed()); 336 ProcedureTestingUtility.assertIsTimeoutException(result); 337 assertEquals(state.toString(), 3, state.size()); 338 assertEquals("wproc-execute", state.get(0)); 339 assertEquals("wproc-child-execute", state.get(1)); 340 assertEquals("wproc-rollback", state.get(2)); 341 } 342}