001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.mockito.ArgumentMatchers.any; 021import static org.mockito.ArgumentMatchers.anyString; 022import static org.mockito.ArgumentMatchers.eq; 023import static org.mockito.Mockito.doAnswer; 024import static org.mockito.Mockito.doThrow; 025import static org.mockito.Mockito.inOrder; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.never; 028import static org.mockito.Mockito.reset; 029import static org.mockito.Mockito.spy; 030import static org.mockito.Mockito.verifyNoInteractions; 031import static org.mockito.Mockito.when; 032 033import java.io.IOException; 034import java.util.concurrent.ThreadPoolExecutor; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.errorhandling.ForeignException; 037import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 038import org.apache.hadoop.hbase.errorhandling.TimeoutException; 039import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl; 040import org.apache.hadoop.hbase.testclassification.MasterTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.junit.After; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.mockito.InOrder; 047import org.mockito.Mockito; 048import org.mockito.invocation.InvocationOnMock; 049import org.mockito.stubbing.Answer; 050 051import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 052 053/** 054 * Test the procedure member, and it's error handling mechanisms. 055 */ 056@Category({ MasterTests.class, SmallTests.class }) 057public class TestProcedureMember { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestProcedureMember.class); 062 063 private static final long WAKE_FREQUENCY = 100; 064 private static final long TIMEOUT = 100000; 065 private static final long POOL_KEEP_ALIVE = 1; 066 067 private final String op = "some op"; 068 private final byte[] data = new byte[0]; 069 private final ForeignExceptionDispatcher mockListener = 070 Mockito.spy(new ForeignExceptionDispatcher()); 071 private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class); 072 private final ProcedureMemberRpcs mockMemberComms = Mockito.mock(ProcedureMemberRpcs.class); 073 private ProcedureMember member; 074 private ForeignExceptionDispatcher dispatcher; 075 Subprocedure spySub; 076 077 /** 078 * Reset all the mock objects 079 */ 080 @After 081 public void resetTest() throws IOException { 082 reset(mockListener, mockBuilder, mockMemberComms); 083 Closeables.close(member, true); 084 } 085 086 /** 087 * Build a member using the class level mocks 088 * @return member to use for tests 089 */ 090 private ProcedureMember buildCohortMember() { 091 String name = "node"; 092 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); 093 return new ProcedureMember(mockMemberComms, pool, mockBuilder); 094 } 095 096 /** 097 * Setup a procedure member that returns the spied-upon {@link Subprocedure}. 098 */ 099 private void buildCohortMemberPair() throws IOException { 100 dispatcher = new ForeignExceptionDispatcher(); 101 String name = "node"; 102 ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE); 103 member = new ProcedureMember(mockMemberComms, pool, mockBuilder); 104 when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating 105 // exception 106 Subprocedure subproc = new EmptySubprocedure(member, dispatcher); 107 spySub = spy(subproc); 108 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub); 109 addCommitAnswer(); 110 } 111 112 /** 113 * Add a 'in barrier phase' response to the mock controller when it gets a acquired notification 114 */ 115 private void addCommitAnswer() throws IOException { 116 doAnswer(new Answer<Void>() { 117 @Override 118 public Void answer(InvocationOnMock invocation) throws Throwable { 119 member.receivedReachedGlobalBarrier(op); 120 return null; 121 } 122 }).when(mockMemberComms).sendMemberAcquired(any()); 123 } 124 125 /** 126 * Test the normal sub procedure execution case. 127 */ 128 @Test 129 public void testSimpleRun() throws Exception { 130 member = buildCohortMember(); 131 EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener); 132 EmptySubprocedure spy = spy(subproc); 133 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy); 134 135 // when we get a prepare, then start the commit phase 136 addCommitAnswer(); 137 138 // run the operation 139 // build a new operation 140 Subprocedure subproc1 = member.createSubprocedure(op, data); 141 member.submitSubprocedure(subproc1); 142 // and wait for it to finish 143 subproc.waitForLocallyCompleted(); 144 145 // make sure everything ran in order 146 InOrder order = inOrder(mockMemberComms, spy); 147 order.verify(spy).acquireBarrier(); 148 order.verify(mockMemberComms).sendMemberAcquired(eq(spy)); 149 order.verify(spy).insideBarrier(); 150 order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data)); 151 order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), any()); 152 } 153 154 /** 155 * Make sure we call cleanup etc, when we have an exception during 156 * {@link Subprocedure#acquireBarrier()}. 157 */ 158 @Test 159 public void testMemberPrepareException() throws Exception { 160 buildCohortMemberPair(); 161 162 // mock an exception on Subprocedure's prepare 163 doAnswer(new Answer<Void>() { 164 @Override 165 public Void answer(InvocationOnMock invocation) throws Throwable { 166 throw new IOException("Forced IOException in member acquireBarrier"); 167 } 168 }).when(spySub).acquireBarrier(); 169 170 // run the operation 171 // build a new operation 172 Subprocedure subproc = member.createSubprocedure(op, data); 173 member.submitSubprocedure(subproc); 174 // if the operation doesn't die properly, then this will timeout 175 member.closeAndWait(TIMEOUT); 176 177 // make sure everything ran in order 178 InOrder order = inOrder(mockMemberComms, spySub); 179 order.verify(spySub).acquireBarrier(); 180 // Later phases not run 181 order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub)); 182 order.verify(spySub, never()).insideBarrier(); 183 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 184 // error recovery path exercised 185 order.verify(spySub).cancel(anyString(), any()); 186 order.verify(spySub).cleanup(any()); 187 } 188 189 /** 190 * Make sure we call cleanup etc, when we have an exception during prepare. 191 */ 192 @Test 193 public void testSendMemberAcquiredCommsFailure() throws Exception { 194 buildCohortMemberPair(); 195 196 // mock an exception on Subprocedure's prepare 197 doAnswer(new Answer<Void>() { 198 @Override 199 public Void answer(InvocationOnMock invocation) throws Throwable { 200 throw new IOException("Forced IOException in member prepare"); 201 } 202 }).when(mockMemberComms).sendMemberAcquired(any()); 203 204 // run the operation 205 // build a new operation 206 Subprocedure subproc = member.createSubprocedure(op, data); 207 member.submitSubprocedure(subproc); 208 // if the operation doesn't die properly, then this will timeout 209 member.closeAndWait(TIMEOUT); 210 211 // make sure everything ran in order 212 InOrder order = inOrder(mockMemberComms, spySub); 213 order.verify(spySub).acquireBarrier(); 214 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 215 216 // Later phases not run 217 order.verify(spySub, never()).insideBarrier(); 218 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 219 // error recovery path exercised 220 order.verify(spySub).cancel(anyString(), any()); 221 order.verify(spySub).cleanup(any()); 222 } 223 224 /** 225 * Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a 226 * running {@link Subprocedure#acquireBarrier()} -- prepare needs to finish first, and the the 227 * abort is checked. Thus, the {@link Subprocedure#acquireBarrier()} should succeed but later get 228 * rolled back via {@link Subprocedure#cleanup}. 229 */ 230 @Test 231 public void testCoordinatorAbort() throws Exception { 232 buildCohortMemberPair(); 233 234 // mock that another node timed out or failed to prepare 235 final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0); 236 doAnswer(new Answer<Void>() { 237 @Override 238 public Void answer(InvocationOnMock invocation) throws Throwable { 239 // inject a remote error (this would have come from an external thread) 240 spySub.cancel("bogus message", oate); 241 // sleep the wake frequency since that is what we promised 242 Thread.sleep(WAKE_FREQUENCY); 243 return null; 244 } 245 }).when(spySub).waitForReachedGlobalBarrier(); 246 247 // run the operation 248 // build a new operation 249 Subprocedure subproc = member.createSubprocedure(op, data); 250 member.submitSubprocedure(subproc); 251 // if the operation doesn't die properly, then this will timeout 252 member.closeAndWait(TIMEOUT); 253 254 // make sure everything ran in order 255 InOrder order = inOrder(mockMemberComms, spySub); 256 order.verify(spySub).acquireBarrier(); 257 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 258 // Later phases not run 259 order.verify(spySub, never()).insideBarrier(); 260 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 261 // error recovery path exercised 262 order.verify(spySub).cancel(anyString(), any()); 263 order.verify(spySub).cleanup(any()); 264 } 265 266 /** 267 * Handle failures if a member's commit phase fails. 268 * <p/> 269 * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC 270 * the transaction is committed just before the coordinator sends commit messages to the member. 271 * Members are then responsible for reading its TX log. This implementation actually rolls back, 272 * and thus breaks the normal TX guarantees. 273 */ 274 @Test 275 public void testMemberCommitException() throws Exception { 276 buildCohortMemberPair(); 277 278 // mock an exception on Subprocedure's prepare 279 doAnswer(new Answer<Void>() { 280 @Override 281 public Void answer(InvocationOnMock invocation) throws Throwable { 282 throw new IOException("Forced IOException in member prepare"); 283 } 284 }).when(spySub).insideBarrier(); 285 286 // run the operation 287 // build a new operation 288 Subprocedure subproc = member.createSubprocedure(op, data); 289 member.submitSubprocedure(subproc); 290 // if the operation doesn't die properly, then this will timeout 291 member.closeAndWait(TIMEOUT); 292 293 // make sure everything ran in order 294 InOrder order = inOrder(mockMemberComms, spySub); 295 order.verify(spySub).acquireBarrier(); 296 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 297 order.verify(spySub).insideBarrier(); 298 299 // Later phases not run 300 order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); 301 // error recovery path exercised 302 order.verify(spySub).cancel(anyString(), any()); 303 order.verify(spySub).cleanup(any()); 304 } 305 306 /** 307 * Handle Failures if a member's commit phase succeeds but notification to coordinator fails 308 * <p/> 309 * NOTE: This is the core difference that makes this different from traditional 2PC. In true 2PC 310 * the transaction is committed just before the coordinator sends commit messages to the member. 311 * Members are then responsible for reading its TX log. This implementation actually rolls back, 312 * and thus breaks the normal TX guarantees. 313 */ 314 @Test 315 public void testMemberCommitCommsFailure() throws Exception { 316 buildCohortMemberPair(); 317 final TimeoutException oate = new TimeoutException("bogus timeout", 1, 2, 0); 318 doAnswer(new Answer<Void>() { 319 @Override 320 public Void answer(InvocationOnMock invocation) throws Throwable { 321 // inject a remote error (this would have come from an external thread) 322 spySub.cancel("commit comms fail", oate); 323 // sleep the wake frequency since that is what we promised 324 Thread.sleep(WAKE_FREQUENCY); 325 return null; 326 } 327 }).when(mockMemberComms).sendMemberCompleted(any(), eq(data)); 328 329 // run the operation 330 // build a new operation 331 Subprocedure subproc = member.createSubprocedure(op, data); 332 member.submitSubprocedure(subproc); 333 // if the operation doesn't die properly, then this will timeout 334 member.closeAndWait(TIMEOUT); 335 336 // make sure everything ran in order 337 InOrder order = inOrder(mockMemberComms, spySub); 338 order.verify(spySub).acquireBarrier(); 339 order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); 340 order.verify(spySub).insideBarrier(); 341 order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data)); 342 // error recovery path exercised 343 order.verify(spySub).cancel(anyString(), any()); 344 order.verify(spySub).cleanup(any()); 345 } 346 347 /** 348 * Fail correctly on getting an external error while waiting for the prepared latch 349 * @throws Exception on failure 350 */ 351 @Test 352 public void testPropagateConnectionErrorBackToManager() throws Exception { 353 // setup the operation 354 member = buildCohortMember(); 355 ProcedureMember memberSpy = spy(member); 356 357 // setup the commit and the spy 358 final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher(); 359 ForeignExceptionDispatcher dispSpy = spy(dispatcher); 360 Subprocedure commit = new EmptySubprocedure(member, dispatcher); 361 Subprocedure spy = spy(commit); 362 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy); 363 364 // fail during the prepare phase 365 doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier(); 366 // and throw a connection error when we try to tell the controller about it 367 doThrow(new IOException("Controller is down!")).when(mockMemberComms).sendMemberAborted(eq(spy), 368 any()); 369 370 // run the operation 371 // build a new operation 372 Subprocedure subproc = memberSpy.createSubprocedure(op, data); 373 memberSpy.submitSubprocedure(subproc); 374 // if the operation doesn't die properly, then this will timeout 375 memberSpy.closeAndWait(TIMEOUT); 376 377 // make sure everything ran in order 378 InOrder order = inOrder(mockMemberComms, spy, dispSpy); 379 // make sure we acquire. 380 order.verify(spy).acquireBarrier(); 381 order.verify(mockMemberComms, never()).sendMemberAcquired(spy); 382 383 // TODO Need to do another refactor to get this to propagate to the coordinator. 384 // make sure we pass a remote exception back the controller 385 // order.verify(mockMemberComms).sendMemberAborted(eq(spy), 386 // any()); 387 // order.verify(dispSpy).receiveError(anyString(), 388 // any(), any()); 389 } 390 391 /** 392 * Test that the cohort member correctly doesn't attempt to start a task when the builder cannot 393 * correctly build a new task for the requested operation 394 * @throws Exception on failure 395 */ 396 @Test 397 public void testNoTaskToBeRunFromRequest() throws Exception { 398 ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class); 399 when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null).thenThrow( 400 new IllegalStateException("Wrong state!"), 401 new IllegalArgumentException("can't understand the args")); 402 member = new ProcedureMember(mockMemberComms, pool, mockBuilder); 403 // builder returns null 404 // build a new operation 405 Subprocedure subproc = member.createSubprocedure(op, data); 406 member.submitSubprocedure(subproc); 407 // throws an illegal state exception 408 try { 409 // build a new operation 410 Subprocedure subproc2 = member.createSubprocedure(op, data); 411 member.submitSubprocedure(subproc2); 412 } catch (IllegalStateException ise) { 413 } 414 // throws an illegal argument exception 415 try { 416 // build a new operation 417 Subprocedure subproc3 = member.createSubprocedure(op, data); 418 member.submitSubprocedure(subproc3); 419 } catch (IllegalArgumentException iae) { 420 } 421 422 // no request should reach the pool 423 verifyNoInteractions(pool); 424 // get two abort requests 425 // TODO Need to do another refactor to get this to propagate to the coordinator. 426 // verify(mockMemberComms, times(2)).sendMemberAborted(any(), any()); 427 } 428 429 /** 430 * Helper {@link Procedure} who's phase for each step is just empty 431 */ 432 public class EmptySubprocedure extends SubprocedureImpl { 433 public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) { 434 super(member, op, dispatcher, 435 // TODO 1000000 is an arbitrary number that I picked. 436 WAKE_FREQUENCY, TIMEOUT); 437 } 438 } 439}