001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.anyString; 025import static org.mockito.ArgumentMatchers.eq; 026import static org.mockito.Mockito.atLeastOnce; 027import static org.mockito.Mockito.doAnswer; 028import static org.mockito.Mockito.doThrow; 029import static org.mockito.Mockito.inOrder; 030import static org.mockito.Mockito.mock; 031import static org.mockito.Mockito.never; 032import static org.mockito.Mockito.reset; 033import static org.mockito.Mockito.spy; 034import static org.mockito.Mockito.times; 035import static org.mockito.Mockito.verify; 036import static org.mockito.Mockito.when; 037 038import java.io.IOException; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.ThreadPoolExecutor; 042import java.util.concurrent.TimeUnit; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 045import org.apache.hadoop.hbase.testclassification.MasterTests; 046import org.apache.hadoop.hbase.testclassification.SmallTests; 047import org.junit.After; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.mockito.InOrder; 052import org.mockito.invocation.InvocationOnMock; 053import org.mockito.stubbing.Answer; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056 057/** 058 * Test Procedure coordinator operation. 059 * <p> 060 * This only works correctly when we do <i>class level parallelization</i> of tests. If we do method 061 * level serialization this class will likely throw all kinds of errors. 062 */ 063@Category({ MasterTests.class, SmallTests.class }) 064public class TestProcedureCoordinator { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestProcedureCoordinator.class); 069 070 // general test constants 071 private static final long WAKE_FREQUENCY = 1000; 072 private static final long TIMEOUT = 100000; 073 private static final long POOL_KEEP_ALIVE = 1; 074 private static final String nodeName = "node"; 075 private static final String procName = "some op"; 076 private static final byte[] procData = new byte[0]; 077 private static final List<String> expected = Lists.newArrayList("remote1", "remote2"); 078 079 // setup the mocks 080 private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class); 081 private final Procedure task = mock(Procedure.class); 082 private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class); 083 084 // handle to the coordinator for each test 085 private ProcedureCoordinator coordinator; 086 087 @After 088 public void resetTest() throws IOException { 089 // reset all the mocks used for the tests 090 reset(controller, task, monitor); 091 // close the open coordinator, if it was used 092 if (coordinator != null) coordinator.close(); 093 } 094 095 private ProcedureCoordinator buildNewCoordinator() { 096 ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE); 097 return spy(new ProcedureCoordinator(controller, pool)); 098 } 099 100 /** 101 * Currently we can only handle one procedure at a time. This makes sure we handle that and reject 102 * submitting more. 103 */ 104 @Test 105 public void testThreadPoolSize() throws Exception { 106 ProcedureCoordinator coordinator = buildNewCoordinator(); 107 Procedure proc = 108 new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 109 Procedure procSpy = spy(proc); 110 111 Procedure proc2 = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName + "2", 112 procData, expected); 113 Procedure procSpy2 = spy(proc2); 114 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())) 115 .thenReturn(procSpy, procSpy2); 116 117 coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected); 118 // null here means second procedure failed to start. 119 assertNull("Coordinator successfully ran two tasks at once with a single thread pool.", 120 coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected)); 121 } 122 123 /** 124 * Check handling a connection failure correctly if we get it during the acquiring phase 125 */ 126 @Test 127 public void testUnreachableControllerDuringPrepare() throws Exception { 128 coordinator = buildNewCoordinator(); 129 // setup the proc 130 List<String> expected = Arrays.asList("cohort"); 131 Procedure proc = 132 new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected); 133 final Procedure procSpy = spy(proc); 134 135 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())) 136 .thenReturn(procSpy); 137 138 // use the passed controller responses 139 IOException cause = new IOException("Failed to reach comms during acquire"); 140 doThrow(cause).when(controller).sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyList()); 141 142 // run the operation 143 proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected); 144 // and wait for it to finish 145 while (!proc.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)) 146 ; 147 verify(procSpy, atLeastOnce()).receive(any()); 148 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 149 verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected); 150 verify(controller, never()).sendGlobalBarrierReached(any(), anyList()); 151 } 152 153 /** 154 * Check handling a connection failure correctly if we get it during the barrier phase 155 */ 156 @Test 157 public void testUnreachableControllerDuringCommit() throws Exception { 158 coordinator = buildNewCoordinator(); 159 160 // setup the task and spy on it 161 List<String> expected = Arrays.asList("cohort"); 162 final Procedure spy = 163 spy(new Procedure(coordinator, WAKE_FREQUENCY, TIMEOUT, procName, procData, expected)); 164 165 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy); 166 167 // use the passed controller responses 168 IOException cause = new IOException("Failed to reach controller during prepare"); 169 doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" })).when(controller) 170 .sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList()); 171 doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyList()); 172 173 // run the operation 174 Procedure task = 175 coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 176 // and wait for it to finish 177 while (!task.completedLatch.await(WAKE_FREQUENCY, TimeUnit.MILLISECONDS)) 178 ; 179 verify(spy, atLeastOnce()).receive(any()); 180 verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause)); 181 verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyList()); 182 verify(controller, times(1)).sendGlobalBarrierReached(any(), anyList()); 183 } 184 185 @Test 186 public void testNoCohort() throws Exception { 187 runSimpleProcedure(); 188 } 189 190 @Test 191 public void testSingleCohortOrchestration() throws Exception { 192 runSimpleProcedure("one"); 193 } 194 195 @Test 196 public void testMultipleCohortOrchestration() throws Exception { 197 runSimpleProcedure("one", "two", "three", "four"); 198 } 199 200 public void runSimpleProcedure(String... members) throws Exception { 201 coordinator = buildNewCoordinator(); 202 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, 203 procData, Arrays.asList(members)); 204 final Procedure spy = spy(task); 205 runCoordinatedProcedure(spy, members); 206 } 207 208 /** 209 * Test that if nodes join the barrier early we still correctly handle the progress 210 */ 211 @Test 212 public void testEarlyJoiningBarrier() throws Exception { 213 final String[] cohort = new String[] { "one", "two", "three", "four" }; 214 coordinator = buildNewCoordinator(); 215 final ProcedureCoordinator ref = coordinator; 216 Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY, TIMEOUT, procName, 217 procData, Arrays.asList(cohort)); 218 final Procedure spy = spy(task); 219 220 AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) { 221 @Override 222 public void doWork() { 223 // then do some fun where we commit before all nodes have prepared 224 // "one" commits before anyone else is done 225 ref.memberAcquiredBarrier(this.opName, this.cohort[0]); 226 ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]); 227 // but "two" takes a while 228 ref.memberAcquiredBarrier(this.opName, this.cohort[1]); 229 // "three"jumps ahead 230 ref.memberAcquiredBarrier(this.opName, this.cohort[2]); 231 ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]); 232 // and "four" takes a while 233 ref.memberAcquiredBarrier(this.opName, this.cohort[3]); 234 } 235 }; 236 237 BarrierAnswer commit = new BarrierAnswer(procName, cohort) { 238 @Override 239 public void doWork() { 240 ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]); 241 ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]); 242 } 243 }; 244 runCoordinatedOperation(spy, prepare, commit, cohort); 245 } 246 247 /** 248 * Just run a procedure with the standard name and data, with not special task for the mock 249 * coordinator (it works just like a regular coordinator). For custom behavior see 250 * {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])} . 251 * @param spy Spy on a real {@link Procedure} 252 * @param cohort expected cohort members 253 * @throws Exception on failure 254 */ 255 public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception { 256 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), 257 new BarrierAnswer(procName, cohort), cohort); 258 } 259 260 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare, String... cohort) 261 throws Exception { 262 runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort); 263 } 264 265 public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit, String... cohort) 266 throws Exception { 267 runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort); 268 } 269 270 public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation, 271 BarrierAnswer commitOperation, String... cohort) throws Exception { 272 List<String> expected = Arrays.asList(cohort); 273 when(coordinator.createProcedure(any(), eq(procName), eq(procData), anyList())).thenReturn(spy); 274 275 // use the passed controller responses 276 doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected); 277 doAnswer(commitOperation).when(controller).sendGlobalBarrierReached(eq(spy), anyList()); 278 279 // run the operation 280 Procedure task = 281 coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected); 282 // and wait for it to finish 283 task.waitForCompleted(); 284 285 // make sure we mocked correctly 286 prepareOperation.ensureRan(); 287 // we never got an exception 288 InOrder inorder = inOrder(spy, controller); 289 inorder.verify(spy).sendGlobalBarrierStart(); 290 inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected); 291 inorder.verify(spy).sendGlobalBarrierReached(); 292 inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyList()); 293 } 294 295 private static abstract class OperationAnswer implements Answer<Void> { 296 private boolean ran = false; 297 298 public void ensureRan() { 299 assertTrue("Prepare mocking didn't actually run!", ran); 300 } 301 302 @Override 303 public final Void answer(InvocationOnMock invocation) throws Throwable { 304 this.ran = true; 305 doWork(); 306 return null; 307 } 308 309 protected abstract void doWork() throws Throwable; 310 } 311 312 /** 313 * Just tell the current coordinator that each of the nodes has prepared 314 */ 315 private class AcquireBarrierAnswer extends OperationAnswer { 316 protected final String[] cohort; 317 protected final String opName; 318 319 public AcquireBarrierAnswer(String opName, String... cohort) { 320 this.cohort = cohort; 321 this.opName = opName; 322 } 323 324 @Override 325 public void doWork() { 326 if (cohort == null) return; 327 for (String member : cohort) { 328 TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member); 329 } 330 } 331 } 332 333 /** 334 * Just tell the current coordinator that each of the nodes has committed 335 */ 336 private class BarrierAnswer extends OperationAnswer { 337 protected final String[] cohort; 338 protected final String opName; 339 340 public BarrierAnswer(String opName, String... cohort) { 341 this.cohort = cohort; 342 this.opName = opName; 343 } 344 345 @Override 346 public void doWork() { 347 if (cohort == null) return; 348 for (String member : cohort) { 349 TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member, 350 new byte[0]); 351 } 352 } 353 } 354}