001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseZKTestingUtil; 031import org.apache.hadoop.hbase.Stoppable; 032import org.apache.hadoop.hbase.log.HBaseMarkers; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.testclassification.ZKTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.Threads; 037import org.junit.AfterClass; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Category({ ZKTests.class, MediumTests.class }) 046public class TestZKLeaderManager { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestZKLeaderManager.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestZKLeaderManager.class); 053 054 private static final String LEADER_ZNODE = "/test/" + TestZKLeaderManager.class.getSimpleName(); 055 056 private static class MockAbortable implements Abortable { 057 private boolean aborted; 058 059 @Override 060 public void abort(String why, Throwable e) { 061 aborted = true; 062 LOG.error(HBaseMarkers.FATAL, "Aborting during test: " + why, e); 063 fail("Aborted during test: " + why); 064 } 065 066 @Override 067 public boolean isAborted() { 068 return aborted; 069 } 070 } 071 072 private static class MockLeader extends Thread implements Stoppable { 073 private volatile boolean stopped; 074 private ZKWatcher watcher; 075 private ZKLeaderManager zkLeader; 076 private AtomicBoolean master = new AtomicBoolean(false); 077 private int index; 078 079 MockLeader(ZKWatcher watcher, int index) { 080 setDaemon(true); 081 setName("TestZKLeaderManager-leader-" + index); 082 this.index = index; 083 this.watcher = watcher; 084 this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, Bytes.toBytes(index), this); 085 } 086 087 public boolean isMaster() { 088 return master.get(); 089 } 090 091 public int getIndex() { 092 return index; 093 } 094 095 public ZKWatcher getWatcher() { 096 return watcher; 097 } 098 099 @Override 100 public void run() { 101 while (!stopped) { 102 zkLeader.start(); 103 zkLeader.waitToBecomeLeader(); 104 master.set(true); 105 106 while (master.get() && !stopped) { 107 try { 108 Thread.sleep(10); 109 } catch (InterruptedException ignored) { 110 } 111 } 112 } 113 } 114 115 void abdicate() { 116 zkLeader.stepDownAsLeader(); 117 master.set(false); 118 } 119 120 @Override 121 public void stop(String why) { 122 stopped = true; 123 abdicate(); 124 Threads.sleep(100); 125 watcher.close(); 126 } 127 128 @Override 129 public boolean isStopped() { 130 return stopped; 131 } 132 } 133 134 private static HBaseZKTestingUtil TEST_UTIL; 135 private static MockLeader[] CANDIDATES; 136 137 @BeforeClass 138 public static void setupBeforeClass() throws Exception { 139 TEST_UTIL = new HBaseZKTestingUtil(); 140 TEST_UTIL.startMiniZKCluster(); 141 Configuration conf = TEST_UTIL.getConfiguration(); 142 143 // use an abortable to fail the test in the case of any KeeperExceptions 144 MockAbortable abortable = new MockAbortable(); 145 int count = 5; 146 CANDIDATES = new MockLeader[count]; 147 for (int i = 0; i < count; i++) { 148 ZKWatcher watcher = newZK(conf, "server" + i, abortable); 149 CANDIDATES[i] = new MockLeader(watcher, i); 150 CANDIDATES[i].start(); 151 } 152 } 153 154 @AfterClass 155 public static void tearDownAfterClass() throws Exception { 156 TEST_UTIL.shutdownMiniZKCluster(); 157 } 158 159 @Test 160 public void testLeaderSelection() throws Exception { 161 MockLeader currentLeader = getCurrentLeader(); 162 // one leader should have been found 163 assertNotNull("Leader should exist", currentLeader); 164 LOG.debug("Current leader index is " + currentLeader.getIndex()); 165 166 byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 167 assertNotNull("Leader znode should contain leader index", znodeData); 168 assertTrue("Leader znode should not be empty", znodeData.length > 0); 169 int storedIndex = Bytes.toInt(znodeData); 170 LOG.debug("Stored leader index in ZK is " + storedIndex); 171 assertEquals("Leader znode should match leader index", currentLeader.getIndex(), storedIndex); 172 173 // force a leader transition 174 currentLeader.abdicate(); 175 176 // check for new leader 177 currentLeader = getCurrentLeader(); 178 // one leader should have been found 179 assertNotNull("New leader should exist after abdication", currentLeader); 180 LOG.debug("New leader index is " + currentLeader.getIndex()); 181 182 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 183 assertNotNull("Leader znode should contain leader index", znodeData); 184 assertTrue("Leader znode should not be empty", znodeData.length > 0); 185 storedIndex = Bytes.toInt(znodeData); 186 LOG.debug("Stored leader index in ZK is " + storedIndex); 187 assertEquals("Leader znode should match leader index", currentLeader.getIndex(), storedIndex); 188 189 // force another transition by stopping the current 190 currentLeader.stop("Stopping for test"); 191 192 // check for new leader 193 currentLeader = getCurrentLeader(); 194 // one leader should have been found 195 assertNotNull("New leader should exist after stop", currentLeader); 196 LOG.debug("New leader index is " + currentLeader.getIndex()); 197 198 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 199 assertNotNull("Leader znode should contain leader index", znodeData); 200 assertTrue("Leader znode should not be empty", znodeData.length > 0); 201 storedIndex = Bytes.toInt(znodeData); 202 LOG.debug("Stored leader index in ZK is " + storedIndex); 203 assertEquals("Leader znode should match leader index", currentLeader.getIndex(), storedIndex); 204 205 // with a second stop we can guarantee that a previous leader has resumed leading 206 currentLeader.stop("Stopping for test"); 207 208 // check for new 209 currentLeader = getCurrentLeader(); 210 assertNotNull("New leader should exist", currentLeader); 211 } 212 213 private MockLeader getCurrentLeader() { 214 MockLeader currentLeader = null; 215 216 // Wait up to 10 secs for initial leader 217 for (int i = 0; i < 1000; i++) { 218 for (int j = 0; j < CANDIDATES.length; j++) { 219 if (CANDIDATES[j].isMaster()) { 220 // should only be one leader 221 if (currentLeader != null) { 222 fail( 223 "Both candidate " + currentLeader.getIndex() + " and " + j + " claim to be leader!"); 224 } 225 currentLeader = CANDIDATES[j]; 226 } 227 } 228 if (currentLeader != null) { 229 break; 230 } 231 Threads.sleep(100); 232 } 233 return currentLeader; 234 } 235 236 private static ZKWatcher newZK(Configuration conf, String name, Abortable abort) 237 throws Exception { 238 Configuration copy = HBaseConfiguration.create(conf); 239 return new ZKWatcher(copy, name, abort); 240 } 241}