001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.mockito.Mockito.never; 023import static org.mockito.Mockito.spy; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.mockito.Mockito; 044import org.mockito.invocation.InvocationOnMock; 045import org.mockito.stubbing.Answer; 046import org.mockito.verification.VerificationMode; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 051 052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 053 054/** 055 * Test zookeeper-based, procedure controllers 056 */ 057@Category({ MasterTests.class, MediumTests.class }) 058public class TestZKProcedureControllers { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestZKProcedureControllers.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class); 065 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); 066 private static final String COHORT_NODE_NAME = "expected"; 067 private static final String CONTROLLER_NODE_NAME = "controller"; 068 private static final VerificationMode once = Mockito.times(1); 069 070 private final byte[] memberData = new String("data from member").getBytes(); 071 072 @BeforeClass 073 public static void setupTest() throws Exception { 074 UTIL.startMiniZKCluster(); 075 } 076 077 @AfterClass 078 public static void cleanupTest() throws Exception { 079 UTIL.shutdownMiniZKCluster(); 080 } 081 082 /** 083 * Smaller test to just test the actuation on the cohort member 084 * @throws Exception on failure 085 */ 086 @Test 087 public void testSimpleZKCohortMemberController() throws Exception { 088 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 089 final String operationName = "instanceTest"; 090 091 final Subprocedure sub = Mockito.mock(Subprocedure.class); 092 Mockito.when(sub.getName()).thenReturn(operationName); 093 094 final byte[] data = new byte[] { 1, 2, 3 }; 095 final CountDownLatch prepared = new CountDownLatch(1); 096 final CountDownLatch committed = new CountDownLatch(1); 097 098 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher()); 099 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, "testSimple"); 100 101 // mock out cohort member callbacks 102 final ProcedureMember member = Mockito.mock(ProcedureMember.class); 103 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data); 104 Mockito.doAnswer(new Answer<Void>() { 105 @Override 106 public Void answer(InvocationOnMock invocation) throws Throwable { 107 controller.sendMemberAcquired(sub); 108 prepared.countDown(); 109 return null; 110 } 111 }).when(member).submitSubprocedure(sub); 112 Mockito.doAnswer(new Answer<Void>() { 113 @Override 114 public Void answer(InvocationOnMock invocation) throws Throwable { 115 controller.sendMemberCompleted(sub, memberData); 116 committed.countDown(); 117 return null; 118 } 119 }).when(member).receivedReachedGlobalBarrier(operationName); 120 121 // start running the listener 122 controller.start(COHORT_NODE_NAME, member); 123 124 // set a prepare node from a 'coordinator' 125 String prepare = 126 ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName); 127 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data)); 128 // wait for the operation to be prepared 129 prepared.await(); 130 131 // create the commit node so we update the operation to enter the commit phase 132 String commit = 133 ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName); 134 LOG.debug("Found prepared, posting commit node:" + commit); 135 ZKUtil.createAndFailSilent(watcher, commit); 136 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit)); 137 committed.await(); 138 139 verify(monitor, never()).receive(Mockito.any()); 140 // XXX: broken due to composition. 141 // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(), 142 // Mockito.any()); 143 // cleanup after the test 144 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode()); 145 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 146 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 147 } 148 149 @Test 150 public void testZKCoordinatorControllerWithNoCohort() throws Exception { 151 final String operationName = "no cohort controller test"; 152 final byte[] data = new byte[] { 1, 2, 3 }; 153 154 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data); 155 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data); 156 } 157 158 @Test 159 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception { 160 final String operationName = "single member controller test"; 161 final byte[] data = new byte[] { 1, 2, 3 }; 162 163 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort"); 164 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort"); 165 } 166 167 @Test 168 public void testZKCoordinatorControllerMultipleCohort() throws Exception { 169 final String operationName = "multi member controller test"; 170 final byte[] data = new byte[] { 1, 2, 3 }; 171 172 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort", 173 "cohort2", "cohort3"); 174 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort", 175 "cohort2", "cohort3"); 176 } 177 178 private void runMockCommitWithOrchestratedControllers(StartControllers controllers, 179 String operationName, byte[] data, String... cohort) throws Exception { 180 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 181 List<String> expected = Lists.newArrayList(cohort); 182 183 final Subprocedure sub = Mockito.mock(Subprocedure.class); 184 Mockito.when(sub.getName()).thenReturn(operationName); 185 186 CountDownLatch prepared = new CountDownLatch(expected.size()); 187 CountDownLatch committed = new CountDownLatch(expected.size()); 188 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 189 190 // mock out coordinator so we can keep track of zk progress 191 ProcedureCoordinator coordinator = 192 setupMockCoordinator(operationName, prepared, committed, dataFromMembers); 193 194 ProcedureMember member = Mockito.mock(ProcedureMember.class); 195 196 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher, 197 operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 198 ZKProcedureCoordinator controller = pair.getFirst(); 199 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 200 // start the operation 201 Procedure p = Mockito.mock(Procedure.class); 202 Mockito.when(p.getName()).thenReturn(operationName); 203 204 controller.sendGlobalBarrierAcquire(p, data, expected); 205 206 // post the prepare node for each expected node 207 for (ZKProcedureMemberRpcs cc : cohortControllers) { 208 cc.sendMemberAcquired(sub); 209 } 210 211 // wait for all the notifications to reach the coordinator 212 prepared.await(); 213 // make sure we got the all the nodes and no more 214 Mockito.verify(coordinator, times(expected.size())) 215 .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 216 217 // kick off the commit phase 218 controller.sendGlobalBarrierReached(p, expected); 219 220 // post the committed node for each expected node 221 for (ZKProcedureMemberRpcs cc : cohortControllers) { 222 cc.sendMemberCompleted(sub, memberData); 223 } 224 225 // wait for all commit notifications to reach the coordinator 226 committed.await(); 227 // make sure we got the all the nodes and no more 228 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier( 229 Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData)); 230 231 assertEquals("Incorrect number of members returnd data", expected.size(), 232 dataFromMembers.size()); 233 for (byte[] result : dataFromMembers) { 234 assertArrayEquals("Incorrect data from member", memberData, result); 235 } 236 237 controller.resetMembers(p); 238 239 // verify all behavior 240 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 241 verifyCohort(member, cohortControllers.size(), operationName, data); 242 verifyCoordinator(operationName, coordinator, expected); 243 } 244 245 // TODO Broken by composition. 246 // @Test 247 // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception { 248 // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 249 // "cohort1", "cohort2"); 250 // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 251 // "cohort1", "cohort2"); 252 // } 253 254 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data, 255 String... cohort) throws Exception { 256 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 257 List<String> expected = Lists.newArrayList(cohort); 258 259 final Subprocedure sub = Mockito.mock(Subprocedure.class); 260 Mockito.when(sub.getName()).thenReturn(operationName); 261 262 final CountDownLatch prepared = new CountDownLatch(expected.size()); 263 final CountDownLatch committed = new CountDownLatch(expected.size()); 264 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 265 266 // mock out coordinator so we can keep track of zk progress 267 ProcedureCoordinator coordinator = 268 setupMockCoordinator(operationName, prepared, committed, dataFromMembers); 269 270 ProcedureMember member = Mockito.mock(ProcedureMember.class); 271 Procedure p = Mockito.mock(Procedure.class); 272 Mockito.when(p.getName()).thenReturn(operationName); 273 274 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher, 275 operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 276 ZKProcedureCoordinator controller = pair.getFirst(); 277 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 278 279 // post 1/2 the prepare nodes early 280 for (int i = 0; i < cohortControllers.size() / 2; i++) { 281 cohortControllers.get(i).sendMemberAcquired(sub); 282 } 283 284 // start the operation 285 controller.sendGlobalBarrierAcquire(p, data, expected); 286 287 // post the prepare node for each expected node 288 for (ZKProcedureMemberRpcs cc : cohortControllers) { 289 cc.sendMemberAcquired(sub); 290 } 291 292 // wait for all the notifications to reach the coordinator 293 prepared.await(); 294 // make sure we got the all the nodes and no more 295 Mockito.verify(coordinator, times(expected.size())) 296 .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 297 298 // kick off the commit phase 299 controller.sendGlobalBarrierReached(p, expected); 300 301 // post the committed node for each expected node 302 for (ZKProcedureMemberRpcs cc : cohortControllers) { 303 cc.sendMemberCompleted(sub, memberData); 304 } 305 306 // wait for all commit notifications to reach the coordiantor 307 committed.await(); 308 // make sure we got the all the nodes and no more 309 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier( 310 Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData)); 311 312 controller.resetMembers(p); 313 314 // verify all behavior 315 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 316 verifyCohort(member, cohortControllers.size(), operationName, data); 317 verifyCoordinator(operationName, coordinator, expected); 318 } 319 320 /** 321 * @return a mock {@link ProcedureCoordinator} that just counts down the prepared and committed 322 * latch for called to the respective method 323 */ 324 private ProcedureCoordinator setupMockCoordinator(String operationName, 325 final CountDownLatch prepared, final CountDownLatch committed, 326 final ArrayList<byte[]> dataFromMembers) { 327 ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class); 328 Mockito.doAnswer(new Answer<Void>() { 329 @Override 330 public Void answer(InvocationOnMock invocation) throws Throwable { 331 prepared.countDown(); 332 return null; 333 } 334 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 335 Mockito.doAnswer(new Answer<Void>() { 336 @Override 337 public Void answer(InvocationOnMock invocation) throws Throwable { 338 dataFromMembers.add(memberData); 339 committed.countDown(); 340 return null; 341 } 342 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), 343 Mockito.eq(memberData)); 344 return coordinator; 345 } 346 347 /** 348 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper 349 */ 350 private void verifyZooKeeperClean(String operationName, ZKWatcher watcher, 351 ZKProcedureUtil controller) throws Exception { 352 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName); 353 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName); 354 String abort = ZKProcedureUtil.getAbortNode(controller, operationName); 355 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 356 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 357 assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort)); 358 } 359 360 /** 361 * Verify the cohort controller got called once per expected node to start the operation 362 */ 363 private void verifyCohort(ProcedureMember member, int cohortSize, String operationName, 364 byte[] data) { 365 // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName), 366 // (byte[]) Mockito.argThat(new ArrayEquals(data))); 367 Mockito.verify(member, Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any()); 368 369 } 370 371 /** 372 * Verify that the coordinator only got called once for each expected node 373 */ 374 private void verifyCoordinator(String operationName, ProcedureCoordinator coordinator, 375 List<String> expected) { 376 // verify that we got all the expected nodes 377 for (String node : expected) { 378 verify(coordinator, once).memberAcquiredBarrier(operationName, node); 379 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); 380 } 381 } 382 383 /** 384 * Specify how the controllers that should be started (not spy/mockable) for the test. 385 */ 386 private abstract class StartControllers { 387 public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 388 ZKWatcher watcher, String operationName, ProcedureCoordinator coordinator, 389 String controllerName, ProcedureMember member, List<String> cohortNames) throws Exception; 390 } 391 392 private final StartControllers startCoordinatorFirst = new StartControllers() { 393 394 @Override 395 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher, 396 String operationName, ProcedureCoordinator coordinator, String controllerName, 397 ProcedureMember member, List<String> expected) throws Exception { 398 // start the controller 399 ZKProcedureCoordinator controller = 400 new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME); 401 controller.start(coordinator); 402 403 // make a cohort controller for each expected node 404 405 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 406 for (String nodeName : expected) { 407 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 408 cc.start(nodeName, member); 409 cohortControllers.add(cc); 410 } 411 return new Pair<>(controller, cohortControllers); 412 } 413 }; 414 415 /** 416 * Check for the possible race condition where a cohort member starts after the controller and 417 * therefore could miss a new operation 418 */ 419 private final StartControllers startCohortFirst = new StartControllers() { 420 421 @Override 422 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher, 423 String operationName, ProcedureCoordinator coordinator, String controllerName, 424 ProcedureMember member, List<String> expected) throws Exception { 425 426 // make a cohort controller for each expected node 427 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 428 for (String nodeName : expected) { 429 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 430 cc.start(nodeName, member); 431 cohortControllers.add(cc); 432 } 433 434 // start the controller 435 ZKProcedureCoordinator controller = 436 new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME); 437 controller.start(coordinator); 438 439 return new Pair<>(controller, cohortControllers); 440 } 441 }; 442}