001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@Category({ MasterTests.class, SmallTests.class }) 044public class TestYieldProcedures { 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestYieldProcedures.class); 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class); 050 051 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 052 private static final Procedure NULL_PROC = null; 053 054 private ProcedureExecutor<TestProcEnv> procExecutor; 055 private TestScheduler procRunnables; 056 private ProcedureStore procStore; 057 058 private HBaseCommonTestingUtility htu; 059 private FileSystem fs; 060 private Path testDir; 061 private Path logDir; 062 063 @Before 064 public void setUp() throws IOException { 065 htu = new HBaseCommonTestingUtility(); 066 testDir = htu.getDataTestDir(); 067 fs = testDir.getFileSystem(htu.getConfiguration()); 068 assertTrue(testDir.depth() > 1); 069 070 logDir = new Path(testDir, "proc-logs"); 071 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 072 procRunnables = new TestScheduler(); 073 procExecutor = 074 new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); 075 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 076 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 077 } 078 079 @After 080 public void tearDown() throws IOException { 081 procExecutor.stop(); 082 procStore.stop(false); 083 fs.delete(logDir, true); 084 } 085 086 @Test 087 public void testYieldEachExecutionStep() throws Exception { 088 final int NUM_STATES = 3; 089 090 TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3]; 091 for (int i = 0; i < procs.length; ++i) { 092 procs[i] = new TestStateMachineProcedure(true, false); 093 procExecutor.submitProcedure(procs[i]); 094 } 095 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 096 097 for (int i = 0; i < procs.length; ++i) { 098 assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size()); 099 100 // verify execution 101 int index = 0; 102 for (int execStep = 0; execStep < NUM_STATES; ++execStep) { 103 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 104 assertEquals(false, info.isRollback()); 105 assertEquals(execStep, info.getStep().ordinal()); 106 } 107 108 // verify rollback 109 for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) { 110 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 111 assertEquals(true, info.isRollback()); 112 assertEquals(execStep, info.getStep().ordinal()); 113 } 114 } 115 116 // check runnable queue stats 117 assertEquals(0, procRunnables.size()); 118 assertEquals(0, procRunnables.addFrontCalls); 119 assertEquals(15, procRunnables.addBackCalls); 120 assertEquals(12, procRunnables.yieldCalls); 121 assertEquals(16, procRunnables.pollCalls); 122 assertEquals(3, procRunnables.completionCalls); 123 } 124 125 @Test 126 public void testYieldOnInterrupt() throws Exception { 127 final int NUM_STATES = 3; 128 int count = 0; 129 130 TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true); 131 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 132 133 // test execute (we execute steps twice, one has the IE the other completes) 134 assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size()); 135 for (int i = 0; i < NUM_STATES; ++i) { 136 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 137 assertEquals(false, info.isRollback()); 138 assertEquals(i, info.getStep().ordinal()); 139 140 info = proc.getExecutionInfo().get(count++); 141 assertEquals(false, info.isRollback()); 142 assertEquals(i, info.getStep().ordinal()); 143 } 144 145 // test rollback (we execute steps twice, rollback counts both IE and completed) 146 for (int i = NUM_STATES - 1; i >= 0; --i) { 147 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 148 assertEquals(true, info.isRollback()); 149 assertEquals(i, info.getStep().ordinal()); 150 } 151 152 for (int i = NUM_STATES - 1; i >= 0; --i) { 153 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 154 assertEquals(true, info.isRollback()); 155 assertEquals(0, info.getStep().ordinal()); 156 } 157 158 // check runnable queue stats 159 assertEquals(0, procRunnables.size()); 160 assertEquals(0, procRunnables.addFrontCalls); 161 assertEquals(11, procRunnables.addBackCalls); 162 assertEquals(10, procRunnables.yieldCalls); 163 assertEquals(12, procRunnables.pollCalls); 164 assertEquals(1, procRunnables.completionCalls); 165 } 166 167 @Test 168 public void testYieldException() { 169 TestYieldProcedure proc = new TestYieldProcedure(); 170 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 171 assertEquals(6, proc.step); 172 173 // check runnable queue stats 174 assertEquals(0, procRunnables.size()); 175 assertEquals(0, procRunnables.addFrontCalls); 176 assertEquals(6, procRunnables.addBackCalls); 177 assertEquals(5, procRunnables.yieldCalls); 178 assertEquals(7, procRunnables.pollCalls); 179 assertEquals(1, procRunnables.completionCalls); 180 } 181 182 private static class TestProcEnv { 183 public final AtomicLong timestamp = new AtomicLong(0); 184 185 public long nextTimestamp() { 186 return timestamp.incrementAndGet(); 187 } 188 } 189 190 public static class TestStateMachineProcedure 191 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 192 enum State { 193 STATE_1, 194 STATE_2, 195 STATE_3 196 } 197 198 public static class ExecutionInfo { 199 private final boolean rollback; 200 private final long timestamp; 201 private final State step; 202 203 public ExecutionInfo(long timestamp, State step, boolean isRollback) { 204 this.timestamp = timestamp; 205 this.step = step; 206 this.rollback = isRollback; 207 } 208 209 public State getStep() { 210 return step; 211 } 212 213 public long getTimestamp() { 214 return timestamp; 215 } 216 217 public boolean isRollback() { 218 return rollback; 219 } 220 } 221 222 private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>(); 223 private final AtomicBoolean aborted = new AtomicBoolean(false); 224 private final boolean throwInterruptOnceOnEachStep; 225 private final boolean abortOnFinalStep; 226 227 public TestStateMachineProcedure() { 228 this(false, false); 229 } 230 231 public TestStateMachineProcedure(boolean abortOnFinalStep, 232 boolean throwInterruptOnceOnEachStep) { 233 this.abortOnFinalStep = abortOnFinalStep; 234 this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep; 235 } 236 237 public ArrayList<ExecutionInfo> getExecutionInfo() { 238 return executionInfo; 239 } 240 241 @Override 242 protected boolean isRollbackSupported(State state) { 243 return true; 244 } 245 246 @Override 247 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) 248 throws InterruptedException { 249 final long ts = env.nextTimestamp(); 250 LOG.info(getProcId() + " execute step " + state + " ts=" + ts); 251 executionInfo.add(new ExecutionInfo(ts, state, false)); 252 Thread.sleep(150); 253 254 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 255 LOG.debug("THROW INTERRUPT"); 256 throw new InterruptedException("test interrupt"); 257 } 258 259 switch (state) { 260 case STATE_1: 261 setNextState(State.STATE_2); 262 break; 263 case STATE_2: 264 setNextState(State.STATE_3); 265 break; 266 case STATE_3: 267 if (abortOnFinalStep) { 268 setFailure("test", new IOException("Requested abort on final step")); 269 } 270 return Flow.NO_MORE_STATE; 271 default: 272 throw new UnsupportedOperationException(); 273 } 274 return Flow.HAS_MORE_STATE; 275 } 276 277 @Override 278 protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException { 279 final long ts = env.nextTimestamp(); 280 LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts); 281 executionInfo.add(new ExecutionInfo(ts, state, true)); 282 Thread.sleep(150); 283 284 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 285 LOG.debug("THROW INTERRUPT"); 286 throw new InterruptedException("test interrupt"); 287 } 288 289 switch (state) { 290 case STATE_1: 291 break; 292 case STATE_2: 293 break; 294 case STATE_3: 295 break; 296 default: 297 throw new UnsupportedOperationException(); 298 } 299 } 300 301 @Override 302 protected State getState(final int stateId) { 303 return State.values()[stateId]; 304 } 305 306 @Override 307 protected int getStateId(final State state) { 308 return state.ordinal(); 309 } 310 311 @Override 312 protected State getInitialState() { 313 return State.STATE_1; 314 } 315 316 @Override 317 protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) { 318 return true; 319 } 320 321 @Override 322 protected boolean abort(TestProcEnv env) { 323 aborted.set(true); 324 return true; 325 } 326 } 327 328 public static class TestYieldProcedure extends Procedure<TestProcEnv> { 329 private int step = 0; 330 331 public TestYieldProcedure() { 332 } 333 334 @Override 335 protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException { 336 LOG.info("execute step " + step); 337 if (step++ < 5) { 338 throw new ProcedureYieldException(); 339 } 340 return null; 341 } 342 343 @Override 344 protected void rollback(TestProcEnv env) { 345 } 346 347 @Override 348 protected boolean abort(TestProcEnv env) { 349 return false; 350 } 351 352 @Override 353 protected boolean isYieldAfterExecutionStep(final TestProcEnv env) { 354 return true; 355 } 356 357 @Override 358 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 359 } 360 361 @Override 362 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 363 } 364 } 365 366 private static class TestScheduler extends SimpleProcedureScheduler { 367 private int completionCalls; 368 private int addFrontCalls; 369 private int addBackCalls; 370 private int yieldCalls; 371 private int pollCalls; 372 373 public TestScheduler() { 374 } 375 376 @Override 377 public void addFront(final Procedure proc) { 378 addFrontCalls++; 379 super.addFront(proc); 380 } 381 382 @Override 383 public void addBack(final Procedure proc) { 384 addBackCalls++; 385 super.addBack(proc); 386 } 387 388 @Override 389 public void yield(final Procedure proc) { 390 yieldCalls++; 391 super.yield(proc); 392 } 393 394 @Override 395 public Procedure poll() { 396 pollCalls++; 397 return super.poll(); 398 } 399 400 @Override 401 public Procedure poll(long timeout, TimeUnit unit) { 402 pollCalls++; 403 return super.poll(timeout, unit); 404 } 405 406 @Override 407 public void completionCleanup(Procedure proc) { 408 completionCalls++; 409 } 410 } 411}