001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.Mockito.when; 026 027import java.io.IOException; 028import java.io.InterruptedIOException; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.Semaphore; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.hbase.ChoreService; 035import org.apache.hadoop.hbase.CoordinatedStateManager; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.Server; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.client.ClusterConnection; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.monitoring.MonitoredTask; 043import org.apache.hadoop.hbase.monitoring.TaskGroup; 044import org.apache.hadoop.hbase.testclassification.MasterTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; 048import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; 049import org.apache.hadoop.hbase.zookeeper.ZKListener; 050import org.apache.hadoop.hbase.zookeeper.ZKUtil; 051import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 052import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 053import org.apache.zookeeper.KeeperException; 054import org.junit.AfterClass; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.mockito.Mockito; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Test the {@link ActiveMasterManager}. 065 */ 066@Category({ MasterTests.class, MediumTests.class }) 067public class TestActiveMasterManager { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestActiveMasterManager.class); 072 073 private final static Logger LOG = LoggerFactory.getLogger(TestActiveMasterManager.class); 074 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 076 @BeforeClass 077 public static void setUpBeforeClass() throws Exception { 078 TEST_UTIL.startMiniZKCluster(); 079 } 080 081 @AfterClass 082 public static void tearDownAfterClass() throws Exception { 083 TEST_UTIL.shutdownMiniZKCluster(); 084 } 085 086 @Test 087 public void testRestartMaster() throws IOException, KeeperException { 088 try (ZKWatcher zk = 089 new ZKWatcher(TEST_UTIL.getConfiguration(), "testActiveMasterManagerFromZK", null, true)) { 090 try { 091 ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode); 092 ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode); 093 } catch (KeeperException.NoNodeException nne) { 094 } 095 096 // Create the master node with a dummy address 097 ServerName master = ServerName.valueOf("localhost", 1, EnvironmentEdgeManager.currentTime()); 098 // Should not have a master yet 099 DummyMaster dummyMaster = new DummyMaster(zk, master); 100 ClusterStatusTracker clusterStatusTracker = dummyMaster.getClusterStatusTracker(); 101 ActiveMasterManager activeMasterManager = dummyMaster.getActiveMasterManager(); 102 assertFalse(activeMasterManager.clusterHasActiveMaster.get()); 103 assertFalse(activeMasterManager.getActiveMasterServerName().isPresent()); 104 105 // First test becoming the active master uninterrupted 106 TaskGroup status = mockTaskGroup(); 107 clusterStatusTracker.setClusterUp(); 108 109 activeMasterManager.blockUntilBecomingActiveMaster(100, status); 110 assertTrue(activeMasterManager.clusterHasActiveMaster.get()); 111 assertMaster(zk, master); 112 assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); 113 114 // Now pretend master restart 115 DummyMaster secondDummyMaster = new DummyMaster(zk, master); 116 ActiveMasterManager secondActiveMasterManager = secondDummyMaster.getActiveMasterManager(); 117 assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get()); 118 activeMasterManager.blockUntilBecomingActiveMaster(100, status); 119 assertTrue(activeMasterManager.clusterHasActiveMaster.get()); 120 assertMaster(zk, master); 121 assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); 122 assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get()); 123 } 124 } 125 126 /** 127 * Unit tests that uses ZooKeeper but does not use the master-side methods but rather acts 128 * directly on ZK. 129 */ 130 @Test 131 public void testActiveMasterManagerFromZK() throws Exception { 132 try (ZKWatcher zk = 133 new ZKWatcher(TEST_UTIL.getConfiguration(), "testActiveMasterManagerFromZK", null, true)) { 134 try { 135 ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode); 136 ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode); 137 } catch (KeeperException.NoNodeException nne) { 138 } 139 140 // Create the master node with a dummy address 141 ServerName firstMasterAddress = 142 ServerName.valueOf("localhost", 1, EnvironmentEdgeManager.currentTime()); 143 ServerName secondMasterAddress = 144 ServerName.valueOf("localhost", 2, EnvironmentEdgeManager.currentTime()); 145 146 // Should not have a master yet 147 DummyMaster ms1 = new DummyMaster(zk, firstMasterAddress); 148 ActiveMasterManager activeMasterManager = ms1.getActiveMasterManager(); 149 assertFalse(activeMasterManager.clusterHasActiveMaster.get()); 150 assertFalse(activeMasterManager.getActiveMasterServerName().isPresent()); 151 152 // First test becoming the active master uninterrupted 153 ClusterStatusTracker clusterStatusTracker = ms1.getClusterStatusTracker(); 154 clusterStatusTracker.setClusterUp(); 155 156 activeMasterManager.blockUntilBecomingActiveMaster(100, mockTaskGroup()); 157 assertTrue(activeMasterManager.clusterHasActiveMaster.get()); 158 assertMaster(zk, firstMasterAddress); 159 assertMaster(zk, activeMasterManager.getActiveMasterServerName().get()); 160 161 // New manager will now try to become the active master in another thread 162 WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress); 163 t.start(); 164 // Wait for this guy to figure out there is another active master 165 // Wait for 1 second at most 166 int sleeps = 0; 167 while (!t.manager.clusterHasActiveMaster.get() && sleeps < 100) { 168 Thread.sleep(10); 169 sleeps++; 170 } 171 172 // Both should see that there is an active master 173 assertTrue(activeMasterManager.clusterHasActiveMaster.get()); 174 assertTrue(t.manager.clusterHasActiveMaster.get()); 175 // But secondary one should not be the active master 176 assertFalse(t.isActiveMaster); 177 // Verify the active master ServerName is populated in standby master. 178 assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get()); 179 180 // Close the first server and delete it's master node 181 ms1.stop("stopping first server"); 182 183 // Use a listener to capture when the node is actually deleted 184 NodeDeletionListener listener = 185 new NodeDeletionListener(zk, zk.getZNodePaths().masterAddressZNode); 186 zk.registerListener(listener); 187 188 LOG.info("Deleting master node"); 189 ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode); 190 191 // Wait for the node to be deleted 192 LOG.info("Waiting for active master manager to be notified"); 193 listener.waitForDeletion(); 194 LOG.info("Master node deleted"); 195 196 // Now we expect the secondary manager to have and be the active master 197 // Wait for 1 second at most 198 sleeps = 0; 199 while (!t.isActiveMaster && sleeps < 100) { 200 Thread.sleep(10); 201 sleeps++; 202 } 203 LOG.debug("Slept " + sleeps + " times"); 204 205 assertTrue(t.manager.clusterHasActiveMaster.get()); 206 assertTrue(t.isActiveMaster); 207 assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get()); 208 209 LOG.info("Deleting master node"); 210 211 ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode); 212 } 213 } 214 215 @Test 216 public void testBackupMasterUpdates() throws Exception { 217 Configuration conf = TEST_UTIL.getConfiguration(); 218 try (ZKWatcher zk = new ZKWatcher(conf, "testBackupMasterUpdates", null, true)) { 219 ServerName sn1 = ServerName.valueOf("localhost", 1, -1); 220 DummyMaster master1 = new DummyMaster(zk, sn1); 221 ActiveMasterManager activeMasterManager = master1.getActiveMasterManager(); 222 activeMasterManager.blockUntilBecomingActiveMaster(100, mockTaskGroup()); 223 assertEquals(sn1, activeMasterManager.getActiveMasterServerName().get()); 224 assertEquals(0, activeMasterManager.getBackupMasters().size()); 225 // Add backup masters 226 List<String> backupZNodes = new ArrayList<>(); 227 for (int i = 1; i <= 10; i++) { 228 ServerName backupSn = ServerName.valueOf("localhost", 1000 + i, -1); 229 String backupZn = 230 ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupSn.toString()); 231 backupZNodes.add(backupZn); 232 MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234); 233 TEST_UTIL.waitFor(10000, 234 () -> activeMasterManager.getBackupMasters().size() == backupZNodes.size()); 235 } 236 // Remove backup masters 237 int numBackups = backupZNodes.size(); 238 for (String backupZNode : backupZNodes) { 239 ZKUtil.deleteNode(zk, backupZNode); 240 final int currentBackups = --numBackups; 241 TEST_UTIL.waitFor(10000, 242 () -> activeMasterManager.getBackupMasters().size() == currentBackups); 243 } 244 } 245 } 246 247 /** 248 * Assert there is an active master and that it has the specified address. 249 * @param zk single Zookeeper watcher 250 * @param expectedAddress the expected address of the master 251 * @throws KeeperException unexpected Zookeeper exception 252 * @throws IOException if an IO problem is encountered 253 */ 254 private void assertMaster(ZKWatcher zk, ServerName expectedAddress) 255 throws KeeperException, IOException { 256 ServerName readAddress = MasterAddressTracker.getMasterAddress(zk); 257 assertNotNull(readAddress); 258 assertEquals(expectedAddress, readAddress); 259 } 260 261 public static class WaitToBeMasterThread extends Thread { 262 263 ActiveMasterManager manager; 264 DummyMaster dummyMaster; 265 boolean isActiveMaster; 266 267 public WaitToBeMasterThread(ZKWatcher zk, ServerName address) throws InterruptedIOException { 268 this.dummyMaster = new DummyMaster(zk, address); 269 this.manager = this.dummyMaster.getActiveMasterManager(); 270 isActiveMaster = false; 271 } 272 273 @Override 274 public void run() { 275 manager.blockUntilBecomingActiveMaster(100, mockTaskGroup()); 276 LOG.info("Second master has become the active master!"); 277 isActiveMaster = true; 278 } 279 } 280 281 private static TaskGroup mockTaskGroup() { 282 TaskGroup taskGroup = Mockito.mock(TaskGroup.class); 283 MonitoredTask task = Mockito.mock(MonitoredTask.class); 284 when(taskGroup.addTask(any())).thenReturn(task); 285 return taskGroup; 286 } 287 288 public static class NodeDeletionListener extends ZKListener { 289 private static final Logger LOG = LoggerFactory.getLogger(NodeDeletionListener.class); 290 291 private Semaphore lock; 292 private String node; 293 294 public NodeDeletionListener(ZKWatcher watcher, String node) { 295 super(watcher); 296 lock = new Semaphore(0); 297 this.node = node; 298 } 299 300 @Override 301 public void nodeDeleted(String path) { 302 if (path.equals(node)) { 303 LOG.debug("nodeDeleted(" + path + ")"); 304 lock.release(); 305 } 306 } 307 308 public void waitForDeletion() throws InterruptedException { 309 lock.acquire(); 310 } 311 } 312 313 /** 314 * Dummy Master Implementation. 315 */ 316 public static class DummyMaster implements Server { 317 private volatile boolean stopped; 318 private ClusterStatusTracker clusterStatusTracker; 319 private ActiveMasterManager activeMasterManager; 320 321 public DummyMaster(ZKWatcher zk, ServerName master) throws InterruptedIOException { 322 this.clusterStatusTracker = new ClusterStatusTracker(zk, this); 323 clusterStatusTracker.start(); 324 325 this.activeMasterManager = new ActiveMasterManager(zk, master, this); 326 zk.registerListener(activeMasterManager); 327 } 328 329 @Override 330 public void abort(final String msg, final Throwable t) { 331 } 332 333 @Override 334 public boolean isAborted() { 335 return false; 336 } 337 338 @Override 339 public Configuration getConfiguration() { 340 return null; 341 } 342 343 @Override 344 public ZKWatcher getZooKeeper() { 345 return null; 346 } 347 348 @Override 349 public CoordinatedStateManager getCoordinatedStateManager() { 350 return null; 351 } 352 353 @Override 354 public ServerName getServerName() { 355 return null; 356 } 357 358 @Override 359 public boolean isStopped() { 360 return this.stopped; 361 } 362 363 @Override 364 public void stop(String why) { 365 this.stopped = true; 366 } 367 368 @Override 369 public ClusterConnection getConnection() { 370 return null; 371 } 372 373 public ClusterStatusTracker getClusterStatusTracker() { 374 return clusterStatusTracker; 375 } 376 377 public ActiveMasterManager getActiveMasterManager() { 378 return activeMasterManager; 379 } 380 381 @Override 382 public ChoreService getChoreService() { 383 return null; 384 } 385 386 @Override 387 public ClusterConnection getClusterConnection() { 388 // TODO Auto-generated method stub 389 return null; 390 } 391 392 @Override 393 public FileSystem getFileSystem() { 394 return null; 395 } 396 397 @Override 398 public boolean isStopping() { 399 return false; 400 } 401 402 @Override 403 public Connection createConnection(Configuration conf) throws IOException { 404 return null; 405 } 406 } 407}