001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 032import org.apache.hadoop.hbase.procedure2.TestYieldProcedures.TestStateMachineProcedure.State; 033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.After; 037import org.junit.Before; 038import org.junit.ClassRule; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044@Category({ MasterTests.class, SmallTests.class }) 045public class TestYieldProcedures { 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestYieldProcedures.class); 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class); 051 052 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 053 private static final Procedure NULL_PROC = null; 054 055 private ProcedureExecutor<TestProcEnv> procExecutor; 056 private TestScheduler procRunnables; 057 private ProcedureStore procStore; 058 059 private HBaseCommonTestingUtil htu; 060 private FileSystem fs; 061 private Path testDir; 062 private Path logDir; 063 064 @Before 065 public void setUp() throws IOException { 066 htu = new HBaseCommonTestingUtil(); 067 testDir = htu.getDataTestDir(); 068 fs = testDir.getFileSystem(htu.getConfiguration()); 069 assertTrue(testDir.depth() > 1); 070 071 logDir = new Path(testDir, "proc-logs"); 072 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 073 procRunnables = new TestScheduler(); 074 procExecutor = 075 new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); 076 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 077 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 078 } 079 080 @After 081 public void tearDown() throws IOException { 082 procExecutor.stop(); 083 procStore.stop(false); 084 fs.delete(logDir, true); 085 } 086 087 @Test 088 public void testYieldEachExecutionStep() throws Exception { 089 final int NUM_STATES = 3; 090 091 TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3]; 092 for (int i = 0; i < procs.length; ++i) { 093 procs[i] = new TestStateMachineProcedure(true, false); 094 procExecutor.submitProcedure(procs[i]); 095 } 096 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 097 098 for (int i = 0; i < procs.length; ++i) { 099 assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size()); 100 101 // verify execution 102 int index = 0; 103 for (int execStep = 0; execStep < NUM_STATES; ++execStep) { 104 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 105 assertEquals(false, info.isRollback()); 106 assertEquals(execStep, info.getStep().ordinal()); 107 } 108 109 // verify rollback 110 for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) { 111 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 112 assertEquals(true, info.isRollback()); 113 assertEquals(execStep, info.getStep().ordinal()); 114 } 115 } 116 117 // check runnable queue stats 118 assertEquals(0, procRunnables.size()); 119 assertEquals(0, procRunnables.addFrontCalls); 120 assertEquals(15, procRunnables.addBackCalls); 121 assertEquals(12, procRunnables.yieldCalls); 122 assertEquals(16, procRunnables.pollCalls); 123 assertEquals(3, procRunnables.completionCalls); 124 } 125 126 @Test 127 public void testYieldOnInterrupt() throws Exception { 128 final int NUM_STATES = 3; 129 int count = 0; 130 131 TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true); 132 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 133 134 // test execute (we execute steps twice, one has the IE the other completes) 135 assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size()); 136 for (int i = 0; i < NUM_STATES; ++i) { 137 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 138 assertEquals(false, info.isRollback()); 139 assertEquals(i, info.getStep().ordinal()); 140 141 info = proc.getExecutionInfo().get(count++); 142 assertEquals(false, info.isRollback()); 143 assertEquals(i, info.getStep().ordinal()); 144 } 145 146 // test rollback (we execute steps twice, rollback counts both IE and completed) 147 for (int i = NUM_STATES - 1; i >= 0; --i) { 148 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 149 assertEquals(true, info.isRollback()); 150 assertEquals(i, info.getStep().ordinal()); 151 } 152 153 for (int i = NUM_STATES - 1; i >= 0; --i) { 154 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 155 assertEquals(true, info.isRollback()); 156 assertEquals(0, info.getStep().ordinal()); 157 } 158 159 // check runnable queue stats 160 assertEquals(0, procRunnables.size()); 161 assertEquals(0, procRunnables.addFrontCalls); 162 assertEquals(11, procRunnables.addBackCalls); 163 assertEquals(10, procRunnables.yieldCalls); 164 assertEquals(12, procRunnables.pollCalls); 165 assertEquals(1, procRunnables.completionCalls); 166 } 167 168 @Test 169 public void testYieldException() { 170 TestYieldProcedure proc = new TestYieldProcedure(); 171 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 172 assertEquals(6, proc.step); 173 174 // check runnable queue stats 175 assertEquals(0, procRunnables.size()); 176 assertEquals(0, procRunnables.addFrontCalls); 177 assertEquals(6, procRunnables.addBackCalls); 178 assertEquals(5, procRunnables.yieldCalls); 179 assertEquals(7, procRunnables.pollCalls); 180 assertEquals(1, procRunnables.completionCalls); 181 } 182 183 private static class TestProcEnv { 184 public final AtomicLong timestamp = new AtomicLong(0); 185 186 public long nextTimestamp() { 187 return timestamp.incrementAndGet(); 188 } 189 } 190 191 public static class TestStateMachineProcedure 192 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 193 enum State { 194 STATE_1, 195 STATE_2, 196 STATE_3 197 } 198 199 public static class ExecutionInfo { 200 private final boolean rollback; 201 private final long timestamp; 202 private final State step; 203 204 public ExecutionInfo(long timestamp, State step, boolean isRollback) { 205 this.timestamp = timestamp; 206 this.step = step; 207 this.rollback = isRollback; 208 } 209 210 public State getStep() { 211 return step; 212 } 213 214 public long getTimestamp() { 215 return timestamp; 216 } 217 218 public boolean isRollback() { 219 return rollback; 220 } 221 } 222 223 private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>(); 224 private final AtomicBoolean aborted = new AtomicBoolean(false); 225 private final boolean throwInterruptOnceOnEachStep; 226 private final boolean abortOnFinalStep; 227 228 public TestStateMachineProcedure() { 229 this(false, false); 230 } 231 232 public TestStateMachineProcedure(boolean abortOnFinalStep, 233 boolean throwInterruptOnceOnEachStep) { 234 this.abortOnFinalStep = abortOnFinalStep; 235 this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep; 236 } 237 238 public ArrayList<ExecutionInfo> getExecutionInfo() { 239 return executionInfo; 240 } 241 242 @Override 243 protected boolean isRollbackSupported(State state) { 244 return true; 245 } 246 247 @Override 248 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) 249 throws InterruptedException { 250 final long ts = env.nextTimestamp(); 251 LOG.info(getProcId() + " execute step " + state + " ts=" + ts); 252 executionInfo.add(new ExecutionInfo(ts, state, false)); 253 Thread.sleep(150); 254 255 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 256 LOG.debug("THROW INTERRUPT"); 257 throw new InterruptedException("test interrupt"); 258 } 259 260 switch (state) { 261 case STATE_1: 262 setNextState(State.STATE_2); 263 break; 264 case STATE_2: 265 setNextState(State.STATE_3); 266 break; 267 case STATE_3: 268 if (abortOnFinalStep) { 269 setFailure("test", new IOException("Requested abort on final step")); 270 } 271 return Flow.NO_MORE_STATE; 272 default: 273 throw new UnsupportedOperationException(); 274 } 275 return Flow.HAS_MORE_STATE; 276 } 277 278 @Override 279 protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException { 280 final long ts = env.nextTimestamp(); 281 LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts); 282 executionInfo.add(new ExecutionInfo(ts, state, true)); 283 Thread.sleep(150); 284 285 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 286 LOG.debug("THROW INTERRUPT"); 287 throw new InterruptedException("test interrupt"); 288 } 289 290 switch (state) { 291 case STATE_1: 292 break; 293 case STATE_2: 294 break; 295 case STATE_3: 296 break; 297 default: 298 throw new UnsupportedOperationException(); 299 } 300 } 301 302 @Override 303 protected State getState(final int stateId) { 304 return State.values()[stateId]; 305 } 306 307 @Override 308 protected int getStateId(final State state) { 309 return state.ordinal(); 310 } 311 312 @Override 313 protected State getInitialState() { 314 return State.STATE_1; 315 } 316 317 @Override 318 protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) { 319 return true; 320 } 321 322 @Override 323 protected boolean abort(TestProcEnv env) { 324 aborted.set(true); 325 return true; 326 } 327 } 328 329 public static class TestYieldProcedure extends Procedure<TestProcEnv> { 330 private int step = 0; 331 332 public TestYieldProcedure() { 333 } 334 335 @Override 336 protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException { 337 LOG.info("execute step " + step); 338 if (step++ < 5) { 339 throw new ProcedureYieldException(); 340 } 341 return null; 342 } 343 344 @Override 345 protected void rollback(TestProcEnv env) { 346 } 347 348 @Override 349 protected boolean abort(TestProcEnv env) { 350 return false; 351 } 352 353 @Override 354 protected boolean isYieldAfterExecutionStep(final TestProcEnv env) { 355 return true; 356 } 357 358 @Override 359 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 360 } 361 362 @Override 363 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 364 } 365 } 366 367 private static class TestScheduler extends SimpleProcedureScheduler { 368 private int completionCalls; 369 private int addFrontCalls; 370 private int addBackCalls; 371 private int yieldCalls; 372 private int pollCalls; 373 374 public TestScheduler() { 375 } 376 377 @Override 378 public void addFront(final Procedure proc) { 379 addFrontCalls++; 380 super.addFront(proc); 381 } 382 383 @Override 384 public void addBack(final Procedure proc) { 385 addBackCalls++; 386 super.addBack(proc); 387 } 388 389 @Override 390 public void yield(final Procedure proc) { 391 yieldCalls++; 392 super.yield(proc); 393 } 394 395 @Override 396 public Procedure poll() { 397 pollCalls++; 398 return super.poll(); 399 } 400 401 @Override 402 public Procedure poll(long timeout, TimeUnit unit) { 403 pollCalls++; 404 return super.poll(timeout, unit); 405 } 406 407 @Override 408 public void completionCleanup(Procedure proc) { 409 completionCalls++; 410 } 411 } 412}