001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicReference; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.NonceKey; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Category({ MasterTests.class, SmallTests.class }) 047public class TestProcedureNonce { 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestProcedureNonce.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureNonce.class); 053 054 private static final int PROCEDURE_EXECUTOR_SLOTS = 2; 055 056 private static TestProcEnv procEnv; 057 private static ProcedureExecutor<TestProcEnv> procExecutor; 058 private static ProcedureStore procStore; 059 060 private HBaseCommonTestingUtil htu; 061 private FileSystem fs; 062 private Path logDir; 063 064 @Before 065 public void setUp() throws IOException { 066 htu = new HBaseCommonTestingUtil(); 067 Path testDir = htu.getDataTestDir(); 068 fs = testDir.getFileSystem(htu.getConfiguration()); 069 assertTrue(testDir.depth() > 1); 070 071 logDir = new Path(testDir, "proc-logs"); 072 procEnv = new TestProcEnv(); 073 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 074 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 075 procExecutor.testing = new ProcedureExecutor.Testing(); 076 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 077 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 078 } 079 080 @After 081 public void tearDown() throws IOException { 082 procExecutor.stop(); 083 procStore.stop(false); 084 fs.delete(logDir, true); 085 } 086 087 @Test 088 public void testCompletedProcWithSameNonce() throws Exception { 089 final long nonceGroup = 123; 090 final long nonce = 2222; 091 092 // register the nonce 093 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 094 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 095 096 // Submit a proc and wait for its completion 097 Procedure proc = new TestSingleStepProcedure(); 098 long procId = procExecutor.submitProcedure(proc, nonceKey); 099 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 100 101 // Restart 102 ProcedureTestingUtility.restart(procExecutor); 103 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 104 105 // try to register a procedure with the same nonce 106 // we should get back the old procId 107 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 108 109 Procedure<?> result = procExecutor.getResult(procId); 110 ProcedureTestingUtility.assertProcNotFailed(result); 111 } 112 113 @Test 114 public void testRunningProcWithSameNonce() throws Exception { 115 final long nonceGroup = 456; 116 final long nonce = 33333; 117 118 // register the nonce 119 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 120 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 121 122 // Submit a proc and use a latch to prevent the step execution until we submitted proc2 123 CountDownLatch latch = new CountDownLatch(1); 124 TestSingleStepProcedure proc = new TestSingleStepProcedure(); 125 procEnv.setWaitLatch(latch); 126 long procId = procExecutor.submitProcedure(proc, nonceKey); 127 while (proc.step != 1) { 128 Threads.sleep(25); 129 } 130 131 // try to register a procedure with the same nonce 132 // we should get back the old procId 133 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 134 135 // complete the procedure 136 latch.countDown(); 137 138 // Restart, the procedure is not completed yet 139 ProcedureTestingUtility.restart(procExecutor); 140 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 141 142 // try to register a procedure with the same nonce 143 // we should get back the old procId 144 assertEquals(procId, procExecutor.registerNonce(nonceKey)); 145 146 Procedure<?> result = procExecutor.getResult(procId); 147 ProcedureTestingUtility.assertProcNotFailed(result); 148 } 149 150 @Test 151 public void testSetFailureResultForNonce() throws IOException { 152 final long nonceGroup = 234; 153 final long nonce = 55555; 154 155 // check and register the request nonce 156 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 157 assertFalse(procExecutor.registerNonce(nonceKey) >= 0); 158 159 procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(), 160 new IOException("test failure")); 161 162 final long procId = procExecutor.registerNonce(nonceKey); 163 Procedure<?> result = procExecutor.getResult(procId); 164 ProcedureTestingUtility.assertProcFailed(result); 165 } 166 167 @Test 168 public void testConcurrentNonceRegistration() throws IOException { 169 testConcurrentNonceRegistration(true, 567, 44444); 170 } 171 172 @Test 173 public void testConcurrentNonceRegistrationWithRollback() throws IOException { 174 testConcurrentNonceRegistration(false, 890, 55555); 175 } 176 177 private void testConcurrentNonceRegistration(final boolean submitProcedure, final long nonceGroup, 178 final long nonce) throws IOException { 179 // register the nonce 180 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 181 182 final AtomicReference<Throwable> t1Exception = new AtomicReference(); 183 final AtomicReference<Throwable> t2Exception = new AtomicReference(); 184 185 final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1); 186 final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1); 187 final Thread[] threads = new Thread[2]; 188 threads[0] = new Thread() { 189 @Override 190 public void run() { 191 try { 192 // release the nonce and wake t2 193 assertFalse("unexpected already registered nonce", 194 procExecutor.registerNonce(nonceKey) >= 0); 195 t1NonceRegisteredLatch.countDown(); 196 197 // hold the submission until t2 is registering the nonce 198 t2BeforeNonceRegisteredLatch.await(); 199 Threads.sleep(1000); 200 201 if (submitProcedure) { 202 CountDownLatch latch = new CountDownLatch(1); 203 TestSingleStepProcedure proc = new TestSingleStepProcedure(); 204 procEnv.setWaitLatch(latch); 205 206 procExecutor.submitProcedure(proc, nonceKey); 207 Threads.sleep(100); 208 209 // complete the procedure 210 latch.countDown(); 211 } else { 212 procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey); 213 } 214 } catch (Throwable e) { 215 t1Exception.set(e); 216 } finally { 217 t1NonceRegisteredLatch.countDown(); 218 t2BeforeNonceRegisteredLatch.countDown(); 219 } 220 } 221 }; 222 223 threads[1] = new Thread() { 224 @Override 225 public void run() { 226 try { 227 // wait until t1 has registered the nonce 228 t1NonceRegisteredLatch.await(); 229 230 // register the nonce 231 t2BeforeNonceRegisteredLatch.countDown(); 232 assertFalse("unexpected non registered nonce", procExecutor.registerNonce(nonceKey) < 0); 233 } catch (Throwable e) { 234 t2Exception.set(e); 235 } finally { 236 t1NonceRegisteredLatch.countDown(); 237 t2BeforeNonceRegisteredLatch.countDown(); 238 } 239 } 240 }; 241 242 for (int i = 0; i < threads.length; ++i) { 243 threads[i].start(); 244 } 245 246 for (int i = 0; i < threads.length; ++i) { 247 Threads.shutdown(threads[i]); 248 } 249 250 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 251 assertEquals(null, t1Exception.get()); 252 assertEquals(null, t2Exception.get()); 253 } 254 255 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 256 private int step = 0; 257 258 public TestSingleStepProcedure() { 259 } 260 261 @Override 262 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 263 step++; 264 env.waitOnLatch(); 265 LOG.debug("execute procedure " + this + " step=" + step); 266 step++; 267 setResult(Bytes.toBytes(step)); 268 return null; 269 } 270 271 @Override 272 protected void rollback(TestProcEnv env) { 273 } 274 275 @Override 276 protected boolean abort(TestProcEnv env) { 277 return true; 278 } 279 } 280 281 private static class TestProcEnv { 282 private CountDownLatch latch = null; 283 284 /** 285 * set/unset a latch. every procedure execute() step will wait on the latch if any. 286 */ 287 public void setWaitLatch(CountDownLatch latch) { 288 this.latch = latch; 289 } 290 291 public void waitOnLatch() throws InterruptedException { 292 if (latch != null) { 293 latch.await(); 294 } 295 } 296 } 297}