001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.mockito.Mockito.never; 023import static org.mockito.Mockito.spy; 024import static org.mockito.Mockito.times; 025import static org.mockito.Mockito.verify; 026 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtil; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.junit.AfterClass; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.mockito.Mockito; 045import org.mockito.invocation.InvocationOnMock; 046import org.mockito.stubbing.Answer; 047import org.mockito.verification.VerificationMode; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054 055/** 056 * Test zookeeper-based, procedure controllers 057 */ 058@Category({ MasterTests.class, MediumTests.class }) 059public class TestZKProcedureControllers { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestZKProcedureControllers.class); 064 065 private static final Logger LOG = LoggerFactory.getLogger(TestZKProcedureControllers.class); 066 private final static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 067 private static final String COHORT_NODE_NAME = "expected"; 068 private static final String CONTROLLER_NODE_NAME = "controller"; 069 private static final VerificationMode once = Mockito.times(1); 070 071 private final byte[] memberData = Bytes.toBytes("data from member"); 072 073 @BeforeClass 074 public static void setupTest() throws Exception { 075 UTIL.startMiniZKCluster(); 076 } 077 078 @AfterClass 079 public static void cleanupTest() throws Exception { 080 UTIL.shutdownMiniZKCluster(); 081 } 082 083 /** 084 * Smaller test to just test the actuation on the cohort member 085 * @throws Exception on failure 086 */ 087 @Test 088 public void testSimpleZKCohortMemberController() throws Exception { 089 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 090 final String operationName = "instanceTest"; 091 092 final Subprocedure sub = Mockito.mock(Subprocedure.class); 093 Mockito.when(sub.getName()).thenReturn(operationName); 094 095 final byte[] data = new byte[] { 1, 2, 3 }; 096 final CountDownLatch prepared = new CountDownLatch(1); 097 final CountDownLatch committed = new CountDownLatch(1); 098 099 final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher()); 100 final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, "testSimple"); 101 102 // mock out cohort member callbacks 103 final ProcedureMember member = Mockito.mock(ProcedureMember.class); 104 Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data); 105 Mockito.doAnswer(new Answer<Void>() { 106 @Override 107 public Void answer(InvocationOnMock invocation) throws Throwable { 108 controller.sendMemberAcquired(sub); 109 prepared.countDown(); 110 return null; 111 } 112 }).when(member).submitSubprocedure(sub); 113 Mockito.doAnswer(new Answer<Void>() { 114 @Override 115 public Void answer(InvocationOnMock invocation) throws Throwable { 116 controller.sendMemberCompleted(sub, memberData); 117 committed.countDown(); 118 return null; 119 } 120 }).when(member).receivedReachedGlobalBarrier(operationName); 121 122 // start running the listener 123 controller.start(COHORT_NODE_NAME, member); 124 125 // set a prepare node from a 'coordinator' 126 String prepare = 127 ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName); 128 ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data)); 129 // wait for the operation to be prepared 130 prepared.await(); 131 132 // create the commit node so we update the operation to enter the commit phase 133 String commit = 134 ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName); 135 LOG.debug("Found prepared, posting commit node:" + commit); 136 ZKUtil.createAndFailSilent(watcher, commit); 137 LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit)); 138 committed.await(); 139 140 verify(monitor, never()).receive(Mockito.any()); 141 // XXX: broken due to composition. 142 // verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(), 143 // Mockito.any()); 144 // cleanup after the test 145 ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode()); 146 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 147 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 148 } 149 150 @Test 151 public void testZKCoordinatorControllerWithNoCohort() throws Exception { 152 final String operationName = "no cohort controller test"; 153 final byte[] data = new byte[] { 1, 2, 3 }; 154 155 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data); 156 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data); 157 } 158 159 @Test 160 public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception { 161 final String operationName = "single member controller test"; 162 final byte[] data = new byte[] { 1, 2, 3 }; 163 164 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort"); 165 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort"); 166 } 167 168 @Test 169 public void testZKCoordinatorControllerMultipleCohort() throws Exception { 170 final String operationName = "multi member controller test"; 171 final byte[] data = new byte[] { 1, 2, 3 }; 172 173 runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort", 174 "cohort2", "cohort3"); 175 runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort", 176 "cohort2", "cohort3"); 177 } 178 179 private void runMockCommitWithOrchestratedControllers(StartControllers controllers, 180 String operationName, byte[] data, String... cohort) throws Exception { 181 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 182 List<String> expected = Lists.newArrayList(cohort); 183 184 final Subprocedure sub = Mockito.mock(Subprocedure.class); 185 Mockito.when(sub.getName()).thenReturn(operationName); 186 187 CountDownLatch prepared = new CountDownLatch(expected.size()); 188 CountDownLatch committed = new CountDownLatch(expected.size()); 189 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 190 191 // mock out coordinator so we can keep track of zk progress 192 ProcedureCoordinator coordinator = 193 setupMockCoordinator(operationName, prepared, committed, dataFromMembers); 194 195 ProcedureMember member = Mockito.mock(ProcedureMember.class); 196 197 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher, 198 operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 199 ZKProcedureCoordinator controller = pair.getFirst(); 200 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 201 // start the operation 202 Procedure p = Mockito.mock(Procedure.class); 203 Mockito.when(p.getName()).thenReturn(operationName); 204 205 controller.sendGlobalBarrierAcquire(p, data, expected); 206 207 // post the prepare node for each expected node 208 for (ZKProcedureMemberRpcs cc : cohortControllers) { 209 cc.sendMemberAcquired(sub); 210 } 211 212 // wait for all the notifications to reach the coordinator 213 prepared.await(); 214 // make sure we got the all the nodes and no more 215 Mockito.verify(coordinator, times(expected.size())) 216 .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 217 218 // kick off the commit phase 219 controller.sendGlobalBarrierReached(p, expected); 220 221 // post the committed node for each expected node 222 for (ZKProcedureMemberRpcs cc : cohortControllers) { 223 cc.sendMemberCompleted(sub, memberData); 224 } 225 226 // wait for all commit notifications to reach the coordinator 227 committed.await(); 228 // make sure we got the all the nodes and no more 229 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier( 230 Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData)); 231 232 assertEquals("Incorrect number of members returnd data", expected.size(), 233 dataFromMembers.size()); 234 for (byte[] result : dataFromMembers) { 235 assertArrayEquals("Incorrect data from member", memberData, result); 236 } 237 238 controller.resetMembers(p); 239 240 // verify all behavior 241 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 242 verifyCohort(member, cohortControllers.size(), operationName, data); 243 verifyCoordinator(operationName, coordinator, expected); 244 } 245 246 // TODO Broken by composition. 247 // @Test 248 // public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception { 249 // runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 250 // "cohort1", "cohort2"); 251 // runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 }, 252 // "cohort1", "cohort2"); 253 // } 254 255 public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data, 256 String... cohort) throws Exception { 257 ZKWatcher watcher = UTIL.getZooKeeperWatcher(); 258 List<String> expected = Lists.newArrayList(cohort); 259 260 final Subprocedure sub = Mockito.mock(Subprocedure.class); 261 Mockito.when(sub.getName()).thenReturn(operationName); 262 263 final CountDownLatch prepared = new CountDownLatch(expected.size()); 264 final CountDownLatch committed = new CountDownLatch(expected.size()); 265 ArrayList<byte[]> dataFromMembers = new ArrayList<>(); 266 267 // mock out coordinator so we can keep track of zk progress 268 ProcedureCoordinator coordinator = 269 setupMockCoordinator(operationName, prepared, committed, dataFromMembers); 270 271 ProcedureMember member = Mockito.mock(ProcedureMember.class); 272 Procedure p = Mockito.mock(Procedure.class); 273 Mockito.when(p.getName()).thenReturn(operationName); 274 275 Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> pair = controllers.start(watcher, 276 operationName, coordinator, CONTROLLER_NODE_NAME, member, expected); 277 ZKProcedureCoordinator controller = pair.getFirst(); 278 List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond(); 279 280 // post 1/2 the prepare nodes early 281 for (int i = 0; i < cohortControllers.size() / 2; i++) { 282 cohortControllers.get(i).sendMemberAcquired(sub); 283 } 284 285 // start the operation 286 controller.sendGlobalBarrierAcquire(p, data, expected); 287 288 // post the prepare node for each expected node 289 for (ZKProcedureMemberRpcs cc : cohortControllers) { 290 cc.sendMemberAcquired(sub); 291 } 292 293 // wait for all the notifications to reach the coordinator 294 prepared.await(); 295 // make sure we got the all the nodes and no more 296 Mockito.verify(coordinator, times(expected.size())) 297 .memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 298 299 // kick off the commit phase 300 controller.sendGlobalBarrierReached(p, expected); 301 302 // post the committed node for each expected node 303 for (ZKProcedureMemberRpcs cc : cohortControllers) { 304 cc.sendMemberCompleted(sub, memberData); 305 } 306 307 // wait for all commit notifications to reach the coordiantor 308 committed.await(); 309 // make sure we got the all the nodes and no more 310 Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier( 311 Mockito.eq(operationName), Mockito.anyString(), Mockito.eq(memberData)); 312 313 controller.resetMembers(p); 314 315 // verify all behavior 316 verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil()); 317 verifyCohort(member, cohortControllers.size(), operationName, data); 318 verifyCoordinator(operationName, coordinator, expected); 319 } 320 321 /** 322 * @return a mock {@link ProcedureCoordinator} that just counts down the prepared and committed 323 * latch for called to the respective method 324 */ 325 private ProcedureCoordinator setupMockCoordinator(String operationName, 326 final CountDownLatch prepared, final CountDownLatch committed, 327 final ArrayList<byte[]> dataFromMembers) { 328 ProcedureCoordinator coordinator = Mockito.mock(ProcedureCoordinator.class); 329 Mockito.doAnswer(new Answer<Void>() { 330 @Override 331 public Void answer(InvocationOnMock invocation) throws Throwable { 332 prepared.countDown(); 333 return null; 334 } 335 }).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString()); 336 Mockito.doAnswer(new Answer<Void>() { 337 @Override 338 public Void answer(InvocationOnMock invocation) throws Throwable { 339 dataFromMembers.add(memberData); 340 committed.countDown(); 341 return null; 342 } 343 }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), 344 Mockito.eq(memberData)); 345 return coordinator; 346 } 347 348 /** 349 * Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper 350 */ 351 private void verifyZooKeeperClean(String operationName, ZKWatcher watcher, 352 ZKProcedureUtil controller) throws Exception { 353 String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName); 354 String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName); 355 String abort = ZKProcedureUtil.getAbortNode(controller, operationName); 356 assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare)); 357 assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit)); 358 assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort)); 359 } 360 361 /** 362 * Verify the cohort controller got called once per expected node to start the operation 363 */ 364 private void verifyCohort(ProcedureMember member, int cohortSize, String operationName, 365 byte[] data) { 366 // verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName), 367 // (byte[]) Mockito.argThat(new ArrayEquals(data))); 368 Mockito.verify(member, Mockito.atLeast(cohortSize)).submitSubprocedure(Mockito.any()); 369 370 } 371 372 /** 373 * Verify that the coordinator only got called once for each expected node 374 */ 375 private void verifyCoordinator(String operationName, ProcedureCoordinator coordinator, 376 List<String> expected) { 377 // verify that we got all the expected nodes 378 for (String node : expected) { 379 verify(coordinator, once).memberAcquiredBarrier(operationName, node); 380 verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); 381 } 382 } 383 384 /** 385 * Specify how the controllers that should be started (not spy/mockable) for the test. 386 */ 387 private abstract class StartControllers { 388 public abstract Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start( 389 ZKWatcher watcher, String operationName, ProcedureCoordinator coordinator, 390 String controllerName, ProcedureMember member, List<String> cohortNames) throws Exception; 391 } 392 393 private final StartControllers startCoordinatorFirst = new StartControllers() { 394 395 @Override 396 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher, 397 String operationName, ProcedureCoordinator coordinator, String controllerName, 398 ProcedureMember member, List<String> expected) throws Exception { 399 // start the controller 400 ZKProcedureCoordinator controller = 401 new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME); 402 controller.start(coordinator); 403 404 // make a cohort controller for each expected node 405 406 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 407 for (String nodeName : expected) { 408 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 409 cc.start(nodeName, member); 410 cohortControllers.add(cc); 411 } 412 return new Pair<>(controller, cohortControllers); 413 } 414 }; 415 416 /** 417 * Check for the possible race condition where a cohort member starts after the controller and 418 * therefore could miss a new operation 419 */ 420 private final StartControllers startCohortFirst = new StartControllers() { 421 422 @Override 423 public Pair<ZKProcedureCoordinator, List<ZKProcedureMemberRpcs>> start(ZKWatcher watcher, 424 String operationName, ProcedureCoordinator coordinator, String controllerName, 425 ProcedureMember member, List<String> expected) throws Exception { 426 427 // make a cohort controller for each expected node 428 List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<>(); 429 for (String nodeName : expected) { 430 ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName); 431 cc.start(nodeName, member); 432 cohortControllers.add(cc); 433 } 434 435 // start the controller 436 ZKProcedureCoordinator controller = 437 new ZKProcedureCoordinator(watcher, operationName, CONTROLLER_NODE_NAME); 438 controller.start(coordinator); 439 440 return new Pair<>(controller, cohortControllers); 441 } 442 }; 443}