001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.concurrent.atomic.AtomicLong; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Ignore; 039import org.junit.Test; 040import org.junit.experimental.categories.Category; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; 045 046/** 047 * For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so 048 * we should use lock to obtain the correct order. Ignored. 049 */ 050@Ignore 051@Category({ MasterTests.class, SmallTests.class }) 052public class TestProcedureReplayOrder { 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestProcedureReplayOrder.class); 056 057 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureReplayOrder.class); 058 059 private static final int NUM_THREADS = 16; 060 061 private ProcedureExecutor<TestProcedureEnv> procExecutor; 062 private TestProcedureEnv procEnv; 063 private ProcedureStore procStore; 064 065 private HBaseCommonTestingUtil htu; 066 private FileSystem fs; 067 private Path testDir; 068 private Path logDir; 069 070 @Before 071 public void setUp() throws IOException { 072 htu = new HBaseCommonTestingUtil(); 073 htu.getConfiguration().setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 25); 074 075 testDir = htu.getDataTestDir(); 076 fs = testDir.getFileSystem(htu.getConfiguration()); 077 assertTrue(testDir.depth() > 1); 078 079 logDir = new Path(testDir, "proc-logs"); 080 procEnv = new TestProcedureEnv(); 081 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 082 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 083 procStore.start(NUM_THREADS); 084 ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); 085 } 086 087 @After 088 public void tearDown() throws IOException { 089 procExecutor.stop(); 090 procStore.stop(false); 091 fs.delete(logDir, true); 092 } 093 094 @Test 095 public void testSingleStepReplayOrder() throws Exception { 096 final int NUM_PROC_XTHREAD = 32; 097 final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD; 098 099 // submit the procedures 100 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class); 101 102 while (procEnv.getExecId() < NUM_PROCS) { 103 Thread.sleep(100); 104 } 105 106 // restart the executor and allow the procedures to run 107 ProcedureTestingUtility.restart(procExecutor); 108 109 // wait the execution of all the procedures and 110 // assert that the execution order was sorted by procId 111 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 112 procEnv.assertSortedExecList(NUM_PROCS); 113 } 114 115 @Test 116 public void testMultiStepReplayOrder() throws Exception { 117 final int NUM_PROC_XTHREAD = 24; 118 final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2); 119 120 // submit the procedures 121 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class); 122 123 while (procEnv.getExecId() < NUM_PROCS) { 124 Thread.sleep(100); 125 } 126 127 // restart the executor and allow the procedures to run 128 ProcedureTestingUtility.restart(procExecutor); 129 130 // wait the execution of all the procedures and 131 // assert that the execution order was sorted by procId 132 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 133 procEnv.assertSortedExecList(NUM_PROCS); 134 } 135 136 private void submitProcedures(final int nthreads, final int nprocPerThread, 137 final Class<?> procClazz) throws Exception { 138 Thread[] submitThreads = new Thread[nthreads]; 139 for (int i = 0; i < submitThreads.length; ++i) { 140 submitThreads[i] = new Thread() { 141 @Override 142 public void run() { 143 for (int i = 0; i < nprocPerThread; ++i) { 144 try { 145 procExecutor 146 .submitProcedure((Procedure) procClazz.getDeclaredConstructor().newInstance()); 147 } catch (Exception e) { 148 LOG.error("unable to instantiate the procedure", e); 149 fail("failure during the proc.newInstance(): " + e.getMessage()); 150 } 151 } 152 } 153 }; 154 } 155 156 for (int i = 0; i < submitThreads.length; ++i) { 157 submitThreads[i].start(); 158 } 159 160 for (int i = 0; i < submitThreads.length; ++i) { 161 submitThreads[i].join(); 162 } 163 } 164 165 private static class TestProcedureEnv { 166 private ArrayList<TestProcedure> execList = new ArrayList<>(); 167 private AtomicLong execTimestamp = new AtomicLong(0); 168 169 public long getExecId() { 170 return execTimestamp.get(); 171 } 172 173 public long nextExecId() { 174 return execTimestamp.incrementAndGet(); 175 } 176 177 public void addToExecList(final TestProcedure proc) { 178 execList.add(proc); 179 } 180 181 public void assertSortedExecList(int numProcs) { 182 assertEquals(numProcs, execList.size()); 183 LOG.debug("EXEC LIST: " + execList); 184 for (int i = 0; i < execList.size() - 1; ++i) { 185 TestProcedure a = execList.get(i); 186 TestProcedure b = execList.get(i + 1); 187 assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId()); 188 } 189 } 190 } 191 192 public static abstract class TestProcedure extends Procedure<TestProcedureEnv> { 193 protected long execId = 0; 194 protected int step = 0; 195 196 public long getExecId() { 197 return execId; 198 } 199 200 @Override 201 protected void rollback(TestProcedureEnv env) { 202 } 203 204 @Override 205 protected boolean abort(TestProcedureEnv env) { 206 return true; 207 } 208 209 @Override 210 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 211 Int64Value.Builder builder = Int64Value.newBuilder().setValue(execId); 212 serializer.serialize(builder.build()); 213 } 214 215 @Override 216 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 217 Int64Value value = serializer.deserialize(Int64Value.class); 218 execId = value.getValue(); 219 step = 2; 220 } 221 } 222 223 public static class TestSingleStepProcedure extends TestProcedure { 224 public TestSingleStepProcedure() { 225 } 226 227 @Override 228 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 229 LOG.trace("execute procedure step=" + step + ": " + this); 230 if (step == 0) { 231 step = 1; 232 execId = env.nextExecId(); 233 return new Procedure[] { this }; 234 } else if (step == 2) { 235 env.addToExecList(this); 236 return null; 237 } 238 throw new ProcedureYieldException(); 239 } 240 241 @Override 242 public String toString() { 243 return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")"; 244 } 245 } 246 247 public static class TestTwoStepProcedure extends TestProcedure { 248 public TestTwoStepProcedure() { 249 } 250 251 @Override 252 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException { 253 LOG.trace("execute procedure step=" + step + ": " + this); 254 if (step == 0) { 255 step = 1; 256 execId = env.nextExecId(); 257 return new Procedure[] { new TestSingleStepProcedure() }; 258 } else if (step == 2) { 259 env.addToExecList(this); 260 return null; 261 } 262 throw new ProcedureYieldException(); 263 } 264 265 @Override 266 public String toString() { 267 return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")"; 268 } 269 } 270}