001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.junit.After; 032import org.junit.Before; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 038 039@Category({ MasterTests.class, SmallTests.class }) 040public class TestProcedureSkipPersistence { 041 042 @ClassRule 043 public static final HBaseClassTestRule CLASS_RULE = 044 HBaseClassTestRule.forClass(TestProcedureSkipPersistence.class); 045 private ProcedureExecutor<ProcEnv> procExecutor; 046 private ProcedureStore procStore; 047 048 private HBaseCommonTestingUtil htu; 049 private FileSystem fs; 050 private Path testDir; 051 private Path logDir; 052 053 private static volatile int STEP = 0; 054 055 public class ProcEnv { 056 057 public ProcedureExecutor<ProcEnv> getProcedureExecutor() { 058 return procExecutor; 059 } 060 } 061 062 public static class TestProcedure extends Procedure<ProcEnv> { 063 064 // need to override this method, otherwise we will persist the release lock operation and the 065 // test will fail. 066 @Override 067 protected boolean holdLock(ProcEnv env) { 068 return true; 069 } 070 071 @Override 072 protected Procedure<ProcEnv>[] execute(ProcEnv env) 073 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 074 if (STEP == 0) { 075 STEP = 1; 076 setTimeout(60 * 60 * 1000); 077 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 078 skipPersistence(); 079 throw new ProcedureSuspendedException(); 080 } else if (STEP == 1) { 081 STEP = 2; 082 if (hasTimeout()) { 083 setFailure("Should not persist the timeout value", 084 new IOException("Should not persist the timeout value")); 085 return null; 086 } 087 setTimeout(2 * 1000); 088 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 089 // used to confirm that we reset the persist flag before execution 090 throw new ProcedureSuspendedException(); 091 } else { 092 if (!hasTimeout()) { 093 setFailure("Should have persisted the timeout value", 094 new IOException("Should have persisted the timeout value")); 095 } 096 return null; 097 } 098 } 099 100 @Override 101 protected synchronized boolean setTimeoutFailure(ProcEnv env) { 102 setState(ProcedureProtos.ProcedureState.RUNNABLE); 103 env.getProcedureExecutor().getProcedureScheduler().addFront(this); 104 return false; 105 } 106 107 @Override 108 protected void rollback(ProcEnv env) throws IOException, InterruptedException { 109 throw new UnsupportedOperationException(); 110 } 111 112 @Override 113 protected boolean abort(ProcEnv env) { 114 return false; 115 } 116 117 @Override 118 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 119 } 120 121 @Override 122 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 123 } 124 } 125 126 @Before 127 public void setUp() throws IOException { 128 htu = new HBaseCommonTestingUtil(); 129 testDir = htu.getDataTestDir(); 130 fs = testDir.getFileSystem(htu.getConfiguration()); 131 assertTrue(testDir.depth() > 1); 132 133 logDir = new Path(testDir, "proc-logs"); 134 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 135 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new ProcEnv(), procStore); 136 procStore.start(1); 137 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 138 } 139 140 @After 141 public void tearDown() throws IOException { 142 procExecutor.stop(); 143 procStore.stop(false); 144 fs.delete(logDir, true); 145 } 146 147 @Test 148 public void test() throws Exception { 149 TestProcedure proc = new TestProcedure(); 150 long procId = procExecutor.submitProcedure(proc); 151 htu.waitFor(30000, () -> proc.isWaiting() && procExecutor.getActiveExecutorCount() == 0); 152 ProcedureTestingUtility.restart(procExecutor); 153 htu.waitFor(30000, () -> { 154 Procedure<?> p = procExecutor.getProcedure(procId); 155 return (p.isWaiting() || p.isFinished()) && procExecutor.getActiveExecutorCount() == 0; 156 }); 157 assertFalse(procExecutor.isFinished(procId)); 158 ProcedureTestingUtility.restart(procExecutor); 159 htu.waitFor(30000, () -> procExecutor.isFinished(procId)); 160 Procedure<ProcEnv> p = procExecutor.getResult(procId); 161 assertTrue(p.isSuccess()); 162 } 163}