001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.eq; 025import static org.mockito.Mockito.atMost; 026import static org.mockito.Mockito.never; 027import static org.mockito.Mockito.spy; 028import static org.mockito.Mockito.when; 029 030import java.io.IOException; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.List; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.atomic.AtomicInteger; 037import org.apache.hadoop.hbase.Abortable; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.errorhandling.ForeignException; 041import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 042import org.apache.hadoop.hbase.errorhandling.TimeoutException; 043import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.mockito.Mockito; 054import org.mockito.internal.matchers.ArrayEquals; 055import org.mockito.invocation.InvocationOnMock; 056import org.mockito.stubbing.Answer; 057import org.mockito.verification.VerificationMode; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 062 063/** 064 * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster 065 */ 066@Category({ MasterTests.class, MediumTests.class }) 067public class TestZKProcedure { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestZKProcedure.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedure.class); 074 private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 075 private static final String COORDINATOR_NODE_NAME = "coordinator"; 076 private static final long KEEP_ALIVE = 100; // seconds 077 private static final int POOL_SIZE = 1; 078 private static final long TIMEOUT = 10000; // when debugging make this larger for debugging 079 private static final long WAKE_FREQUENCY = 500; 080 private static final String opName = "op"; 081 private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for? 082 private static final VerificationMode once = Mockito.times(1); 083 084 @BeforeClass 085 public static void setupTest() throws Exception { 086 UTIL.startMiniZKCluster(); 087 } 088 089 @AfterClass 090 public static void cleanupTest() throws Exception { 091 UTIL.shutdownMiniZKCluster(); 092 } 093 094 private static ZKWatcher newZooKeeperWatcher() throws IOException { 095 return new ZKWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() { 096 @Override 097 public void abort(String why, Throwable e) { 098 throw new RuntimeException("Unexpected abort in distributed three phase commit test:" + why, 099 e); 100 } 101 102 @Override 103 public boolean isAborted() { 104 return false; 105 } 106 }); 107 } 108 109 @Test 110 public void testEmptyMemberSet() throws Exception { 111 runCommit(); 112 } 113 114 @Test 115 public void testSingleMember() throws Exception { 116 runCommit("one"); 117 } 118 119 @Test 120 public void testMultipleMembers() throws Exception { 121 runCommit("one", "two", "three", "four"); 122 } 123 124 private void runCommit(String... members) throws Exception { 125 // make sure we just have an empty list 126 if (members == null) { 127 members = new String[0]; 128 } 129 List<String> expected = Arrays.asList(members); 130 131 // setup the constants 132 ZKWatcher coordZkw = newZooKeeperWatcher(); 133 String opDescription = "coordination test - " + members.length + " cohort members"; 134 135 // start running the controller 136 ZKProcedureCoordinator coordinatorComms = 137 new ZKProcedureCoordinator(coordZkw, opDescription, COORDINATOR_NODE_NAME); 138 ThreadPoolExecutor pool = 139 ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); 140 ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) { 141 @Override 142 public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, 143 byte[] procArgs, List<String> expectedMembers) { 144 return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers)); 145 } 146 }; 147 148 // build and start members 149 // NOTE: There is a single subprocedure builder for all members here. 150 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); 151 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = 152 new ArrayList<>(members.length); 153 // start each member 154 for (String member : members) { 155 ZKWatcher watcher = newZooKeeperWatcher(); 156 ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription); 157 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); 158 ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory); 159 procMembers.add(new Pair<>(procMember, comms)); 160 comms.start(member, procMember); 161 } 162 163 // setup mock member subprocedures 164 final List<Subprocedure> subprocs = new ArrayList<>(); 165 for (int i = 0; i < procMembers.size(); i++) { 166 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher(); 167 Subprocedure commit = Mockito.spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, 168 cohortMonitor, WAKE_FREQUENCY, TIMEOUT)); 169 subprocs.add(commit); 170 } 171 172 // link subprocedure to buildNewOperation invocation. 173 final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger 174 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName), 175 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() { 176 @Override 177 public Subprocedure answer(InvocationOnMock invocation) throws Throwable { 178 int index = i.getAndIncrement(); 179 LOG.debug("Task size:" + subprocs.size() + ", getting:" + index); 180 Subprocedure commit = subprocs.get(index); 181 return commit; 182 } 183 }); 184 185 // setup spying on the coordinator 186 // Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, 187 // expected)); 188 // Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc); 189 190 // start running the operation 191 Procedure task = 192 coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected); 193 // assertEquals("Didn't mock coordinator task", proc, task); 194 195 // verify all things ran as expected 196 // waitAndVerifyProc(proc, once, once, never(), once, false); 197 waitAndVerifyProc(task, once, once, never(), once, false); 198 verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false); 199 200 // close all the things 201 closeAll(coordinator, coordinatorComms, procMembers); 202 } 203 204 /** 205 * Test a distributed commit with multiple cohort members, where one of the cohort members has a 206 * timeout exception during the prepare stage. 207 */ 208 @Test 209 public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception { 210 String opDescription = "error injection coordination"; 211 String[] cohortMembers = new String[] { "one", "two", "three" }; 212 List<String> expected = Lists.newArrayList(cohortMembers); 213 // error constants 214 final int memberErrorIndex = 2; 215 final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1); 216 217 // start running the coordinator and its controller 218 ZKWatcher coordinatorWatcher = newZooKeeperWatcher(); 219 ZKProcedureCoordinator coordinatorController = 220 new ZKProcedureCoordinator(coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME); 221 ThreadPoolExecutor pool = 222 ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE); 223 ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool)); 224 225 // start a member for each node 226 SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class); 227 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<>(expected.size()); 228 for (String member : expected) { 229 ZKWatcher watcher = newZooKeeperWatcher(); 230 ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription); 231 ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE); 232 ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory); 233 members.add(new Pair<>(mem, controller)); 234 controller.start(member, mem); 235 } 236 237 // setup mock subprocedures 238 final List<Subprocedure> cohortTasks = new ArrayList<>(); 239 final int[] elem = new int[1]; 240 for (int i = 0; i < members.size(); i++) { 241 ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher(); 242 final ProcedureMember comms = members.get(i).getFirst(); 243 Subprocedure commit = 244 Mockito.spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT)); 245 // This nasty bit has one of the impls throw a TimeoutException 246 Mockito.doAnswer(new Answer<Void>() { 247 @Override 248 public Void answer(InvocationOnMock invocation) throws Throwable { 249 int index = elem[0]; 250 if (index == memberErrorIndex) { 251 LOG.debug("Sending error to coordinator"); 252 ForeignException remoteCause = 253 new ForeignException("TIMER", new TimeoutException("subprocTimeout", 1, 2, 0)); 254 Subprocedure r = ((Subprocedure) invocation.getMock()); 255 LOG.error("Remote commit failure, not propagating error:" + remoteCause); 256 comms.receiveAbortProcedure(r.getName(), remoteCause); 257 assertTrue(r.isComplete()); 258 // don't complete the error phase until the coordinator has gotten the error 259 // notification (which ensures that we never progress past prepare) 260 try { 261 Procedure.waitForLatch(coordinatorReceivedErrorLatch, 262 new ForeignExceptionDispatcher(), WAKE_FREQUENCY, "coordinator received error"); 263 } catch (InterruptedException e) { 264 LOG.debug("Wait for latch interrupted, done:" 265 + (coordinatorReceivedErrorLatch.getCount() == 0)); 266 // reset the interrupt status on the thread 267 Thread.currentThread().interrupt(); 268 } 269 } 270 elem[0] = ++index; 271 return null; 272 } 273 }).when(commit).acquireBarrier(); 274 cohortTasks.add(commit); 275 } 276 277 // pass out a task per member 278 final AtomicInteger taskIndex = new AtomicInteger(); 279 Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName), 280 (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(new Answer<Subprocedure>() { 281 @Override 282 public Subprocedure answer(InvocationOnMock invocation) throws Throwable { 283 int index = taskIndex.getAndIncrement(); 284 Subprocedure commit = cohortTasks.get(index); 285 return commit; 286 } 287 }); 288 289 // setup spying on the coordinator 290 ForeignExceptionDispatcher coordinatorTaskErrorMonitor = 291 Mockito.spy(new ForeignExceptionDispatcher()); 292 Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator, coordinatorTaskErrorMonitor, 293 WAKE_FREQUENCY, TIMEOUT, opName, data, expected)); 294 when(coordinator.createProcedure(any(), eq(opName), eq(data), anyList())) 295 .thenReturn(coordinatorTask); 296 // count down the error latch when we get the remote error 297 Mockito.doAnswer(new Answer<Void>() { 298 @Override 299 public Void answer(InvocationOnMock invocation) throws Throwable { 300 // pass on the error to the master 301 invocation.callRealMethod(); 302 // then count down the got error latch 303 coordinatorReceivedErrorLatch.countDown(); 304 return null; 305 } 306 }).when(coordinatorTask).receive(Mockito.any()); 307 308 // ---------------------------- 309 // start running the operation 310 // ---------------------------- 311 312 Procedure task = 313 coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected); 314 assertEquals("Didn't mock coordinator task", coordinatorTask, task); 315 316 // wait for the task to complete 317 try { 318 task.waitForCompleted(); 319 } catch (ForeignException fe) { 320 // this may get caught or may not 321 } 322 323 // ------------- 324 // verification 325 // ------------- 326 327 // always expect prepared, never committed, and possible to have cleanup and finish (racy since 328 // error case) 329 waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true); 330 verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once, once, true); 331 332 // close all the open things 333 closeAll(coordinator, coordinatorController, members); 334 } 335 336 /** 337 * Wait for the coordinator task to complete, and verify all the mocks 338 * @param proc the {@link Procedure} to execute 339 * @param prepare the mock prepare 340 * @param commit the mock commit 341 * @param cleanup the mock cleanup 342 * @param finish the mock finish 343 * @param opHasError the operation error state 344 * @throws Exception on unexpected failure 345 */ 346 private void waitAndVerifyProc(Procedure proc, VerificationMode prepare, VerificationMode commit, 347 VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception { 348 boolean caughtError = false; 349 try { 350 proc.waitForCompleted(); 351 } catch (ForeignException fe) { 352 caughtError = true; 353 } 354 // make sure that the task called all the expected phases 355 Mockito.verify(proc, prepare).sendGlobalBarrierStart(); 356 Mockito.verify(proc, commit).sendGlobalBarrierReached(); 357 Mockito.verify(proc, finish).sendGlobalBarrierComplete(); 358 assertEquals("Operation error state was unexpected", opHasError, 359 proc.getErrorMonitor().hasException()); 360 assertEquals("Operation error state was unexpected", opHasError, caughtError); 361 362 } 363 364 /** 365 * Wait for the coordinator task to complete, and verify all the mocks 366 * @param op the {@link Subprocedure} to use 367 * @param prepare the mock prepare 368 * @param commit the mock commit 369 * @param cleanup the mock cleanup 370 * @param finish the mock finish 371 * @param opHasError the operation error state 372 * @throws Exception on unexpected failure 373 */ 374 private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare, 375 VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError) 376 throws Exception { 377 boolean caughtError = false; 378 try { 379 op.waitForLocallyCompleted(); 380 } catch (ForeignException fe) { 381 caughtError = true; 382 } 383 // make sure that the task called all the expected phases 384 Mockito.verify(op, prepare).acquireBarrier(); 385 Mockito.verify(op, commit).insideBarrier(); 386 // We cannot guarantee that cleanup has run so we don't check it. 387 388 assertEquals("Operation error state was unexpected", opHasError, 389 op.getErrorCheckable().hasException()); 390 assertEquals("Operation error state was unexpected", opHasError, caughtError); 391 392 } 393 394 private void verifyCohortSuccessful(List<String> cohortNames, SubprocedureFactory subprocFactory, 395 Iterable<Subprocedure> cohortTasks, VerificationMode prepare, VerificationMode commit, 396 VerificationMode cleanup, VerificationMode finish, boolean opHasError) throws Exception { 397 398 // make sure we build the correct number of cohort members 399 Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())) 400 .buildSubprocedure(Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data))); 401 // verify that we ran each of the operations cleanly 402 int j = 0; 403 for (Subprocedure op : cohortTasks) { 404 LOG.debug("Checking mock:" + (j++)); 405 waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError); 406 } 407 } 408 409 private void closeAll(ProcedureCoordinator coordinator, 410 ZKProcedureCoordinator coordinatorController, 411 List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort) throws IOException { 412 // make sure we close all the resources 413 for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) { 414 member.getFirst().close(); 415 member.getSecond().close(); 416 } 417 coordinator.close(); 418 coordinatorController.close(); 419 } 420}