001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.Mockito.when;
026
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.Semaphore;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.hbase.ChoreService;
035import org.apache.hadoop.hbase.CoordinatedStateManager;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.Server;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.client.ClusterConnection;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.monitoring.MonitoredTask;
043import org.apache.hadoop.hbase.monitoring.TaskGroup;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
048import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
049import org.apache.hadoop.hbase.zookeeper.ZKListener;
050import org.apache.hadoop.hbase.zookeeper.ZKUtil;
051import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
052import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
053import org.apache.zookeeper.KeeperException;
054import org.junit.AfterClass;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.mockito.Mockito;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * Test the {@link ActiveMasterManager}.
065 */
066@Category({ MasterTests.class, MediumTests.class })
067public class TestActiveMasterManager {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestActiveMasterManager.class);
072
073  private final static Logger LOG = LoggerFactory.getLogger(TestActiveMasterManager.class);
074  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075
076  @BeforeClass
077  public static void setUpBeforeClass() throws Exception {
078    TEST_UTIL.startMiniZKCluster();
079  }
080
081  @AfterClass
082  public static void tearDownAfterClass() throws Exception {
083    TEST_UTIL.shutdownMiniZKCluster();
084  }
085
086  @Test
087  public void testRestartMaster() throws IOException, KeeperException {
088    try (ZKWatcher zk =
089      new ZKWatcher(TEST_UTIL.getConfiguration(), "testActiveMasterManagerFromZK", null, true)) {
090      try {
091        ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
092        ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
093      } catch (KeeperException.NoNodeException nne) {
094      }
095
096      // Create the master node with a dummy address
097      ServerName master = ServerName.valueOf("localhost", 1, EnvironmentEdgeManager.currentTime());
098      // Should not have a master yet
099      DummyMaster dummyMaster = new DummyMaster(zk, master);
100      ClusterStatusTracker clusterStatusTracker = dummyMaster.getClusterStatusTracker();
101      ActiveMasterManager activeMasterManager = dummyMaster.getActiveMasterManager();
102      assertFalse(activeMasterManager.clusterHasActiveMaster.get());
103      assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
104
105      // First test becoming the active master uninterrupted
106      TaskGroup status = mockTaskGroup();
107      clusterStatusTracker.setClusterUp();
108
109      activeMasterManager.blockUntilBecomingActiveMaster(100, status);
110      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
111      assertMaster(zk, master);
112      assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
113
114      // Now pretend master restart
115      DummyMaster secondDummyMaster = new DummyMaster(zk, master);
116      ActiveMasterManager secondActiveMasterManager = secondDummyMaster.getActiveMasterManager();
117      assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
118      activeMasterManager.blockUntilBecomingActiveMaster(100, status);
119      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
120      assertMaster(zk, master);
121      assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
122      assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get());
123    }
124  }
125
126  /**
127   * Unit tests that uses ZooKeeper but does not use the master-side methods but rather acts
128   * directly on ZK.
129   */
130  @Test
131  public void testActiveMasterManagerFromZK() throws Exception {
132    try (ZKWatcher zk =
133      new ZKWatcher(TEST_UTIL.getConfiguration(), "testActiveMasterManagerFromZK", null, true)) {
134      try {
135        ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
136        ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
137      } catch (KeeperException.NoNodeException nne) {
138      }
139
140      // Create the master node with a dummy address
141      ServerName firstMasterAddress =
142        ServerName.valueOf("localhost", 1, EnvironmentEdgeManager.currentTime());
143      ServerName secondMasterAddress =
144        ServerName.valueOf("localhost", 2, EnvironmentEdgeManager.currentTime());
145
146      // Should not have a master yet
147      DummyMaster ms1 = new DummyMaster(zk, firstMasterAddress);
148      ActiveMasterManager activeMasterManager = ms1.getActiveMasterManager();
149      assertFalse(activeMasterManager.clusterHasActiveMaster.get());
150      assertFalse(activeMasterManager.getActiveMasterServerName().isPresent());
151
152      // First test becoming the active master uninterrupted
153      ClusterStatusTracker clusterStatusTracker = ms1.getClusterStatusTracker();
154      clusterStatusTracker.setClusterUp();
155
156      activeMasterManager.blockUntilBecomingActiveMaster(100, mockTaskGroup());
157      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
158      assertMaster(zk, firstMasterAddress);
159      assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
160
161      // New manager will now try to become the active master in another thread
162      WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
163      t.start();
164      // Wait for this guy to figure out there is another active master
165      // Wait for 1 second at most
166      int sleeps = 0;
167      while (!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
168        Thread.sleep(10);
169        sleeps++;
170      }
171
172      // Both should see that there is an active master
173      assertTrue(activeMasterManager.clusterHasActiveMaster.get());
174      assertTrue(t.manager.clusterHasActiveMaster.get());
175      // But secondary one should not be the active master
176      assertFalse(t.isActiveMaster);
177      // Verify the active master ServerName is populated in standby master.
178      assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName().get());
179
180      // Close the first server and delete it's master node
181      ms1.stop("stopping first server");
182
183      // Use a listener to capture when the node is actually deleted
184      NodeDeletionListener listener =
185        new NodeDeletionListener(zk, zk.getZNodePaths().masterAddressZNode);
186      zk.registerListener(listener);
187
188      LOG.info("Deleting master node");
189      ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
190
191      // Wait for the node to be deleted
192      LOG.info("Waiting for active master manager to be notified");
193      listener.waitForDeletion();
194      LOG.info("Master node deleted");
195
196      // Now we expect the secondary manager to have and be the active master
197      // Wait for 1 second at most
198      sleeps = 0;
199      while (!t.isActiveMaster && sleeps < 100) {
200        Thread.sleep(10);
201        sleeps++;
202      }
203      LOG.debug("Slept " + sleeps + " times");
204
205      assertTrue(t.manager.clusterHasActiveMaster.get());
206      assertTrue(t.isActiveMaster);
207      assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName().get());
208
209      LOG.info("Deleting master node");
210
211      ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
212    }
213  }
214
215  @Test
216  public void testBackupMasterUpdates() throws Exception {
217    Configuration conf = TEST_UTIL.getConfiguration();
218    try (ZKWatcher zk = new ZKWatcher(conf, "testBackupMasterUpdates", null, true)) {
219      ServerName sn1 = ServerName.valueOf("localhost", 1, -1);
220      DummyMaster master1 = new DummyMaster(zk, sn1);
221      ActiveMasterManager activeMasterManager = master1.getActiveMasterManager();
222      activeMasterManager.blockUntilBecomingActiveMaster(100, mockTaskGroup());
223      assertEquals(sn1, activeMasterManager.getActiveMasterServerName().get());
224      assertEquals(0, activeMasterManager.getBackupMasters().size());
225      // Add backup masters
226      List<String> backupZNodes = new ArrayList<>();
227      for (int i = 1; i <= 10; i++) {
228        ServerName backupSn = ServerName.valueOf("localhost", 1000 + i, -1);
229        String backupZn =
230          ZNodePaths.joinZNode(zk.getZNodePaths().backupMasterAddressesZNode, backupSn.toString());
231        backupZNodes.add(backupZn);
232        MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234);
233        TEST_UTIL.waitFor(10000,
234          () -> activeMasterManager.getBackupMasters().size() == backupZNodes.size());
235      }
236      // Remove backup masters
237      int numBackups = backupZNodes.size();
238      for (String backupZNode : backupZNodes) {
239        ZKUtil.deleteNode(zk, backupZNode);
240        final int currentBackups = --numBackups;
241        TEST_UTIL.waitFor(10000,
242          () -> activeMasterManager.getBackupMasters().size() == currentBackups);
243      }
244    }
245  }
246
247  /**
248   * Assert there is an active master and that it has the specified address.
249   * @param zk              single Zookeeper watcher
250   * @param expectedAddress the expected address of the master
251   * @throws KeeperException unexpected Zookeeper exception
252   * @throws IOException     if an IO problem is encountered
253   */
254  private void assertMaster(ZKWatcher zk, ServerName expectedAddress)
255    throws KeeperException, IOException {
256    ServerName readAddress = MasterAddressTracker.getMasterAddress(zk);
257    assertNotNull(readAddress);
258    assertEquals(expectedAddress, readAddress);
259  }
260
261  public static class WaitToBeMasterThread extends Thread {
262
263    ActiveMasterManager manager;
264    DummyMaster dummyMaster;
265    boolean isActiveMaster;
266
267    public WaitToBeMasterThread(ZKWatcher zk, ServerName address) throws InterruptedIOException {
268      this.dummyMaster = new DummyMaster(zk, address);
269      this.manager = this.dummyMaster.getActiveMasterManager();
270      isActiveMaster = false;
271    }
272
273    @Override
274    public void run() {
275      manager.blockUntilBecomingActiveMaster(100, mockTaskGroup());
276      LOG.info("Second master has become the active master!");
277      isActiveMaster = true;
278    }
279  }
280
281  private static TaskGroup mockTaskGroup() {
282    TaskGroup taskGroup = Mockito.mock(TaskGroup.class);
283    MonitoredTask task = Mockito.mock(MonitoredTask.class);
284    when(taskGroup.addTask(any())).thenReturn(task);
285    return taskGroup;
286  }
287
288  public static class NodeDeletionListener extends ZKListener {
289    private static final Logger LOG = LoggerFactory.getLogger(NodeDeletionListener.class);
290
291    private Semaphore lock;
292    private String node;
293
294    public NodeDeletionListener(ZKWatcher watcher, String node) {
295      super(watcher);
296      lock = new Semaphore(0);
297      this.node = node;
298    }
299
300    @Override
301    public void nodeDeleted(String path) {
302      if (path.equals(node)) {
303        LOG.debug("nodeDeleted(" + path + ")");
304        lock.release();
305      }
306    }
307
308    public void waitForDeletion() throws InterruptedException {
309      lock.acquire();
310    }
311  }
312
313  /**
314   * Dummy Master Implementation.
315   */
316  public static class DummyMaster implements Server {
317    private volatile boolean stopped;
318    private ClusterStatusTracker clusterStatusTracker;
319    private ActiveMasterManager activeMasterManager;
320
321    public DummyMaster(ZKWatcher zk, ServerName master) throws InterruptedIOException {
322      this.clusterStatusTracker = new ClusterStatusTracker(zk, this);
323      clusterStatusTracker.start();
324
325      this.activeMasterManager = new ActiveMasterManager(zk, master, this);
326      zk.registerListener(activeMasterManager);
327    }
328
329    @Override
330    public void abort(final String msg, final Throwable t) {
331    }
332
333    @Override
334    public boolean isAborted() {
335      return false;
336    }
337
338    @Override
339    public Configuration getConfiguration() {
340      return null;
341    }
342
343    @Override
344    public ZKWatcher getZooKeeper() {
345      return null;
346    }
347
348    @Override
349    public CoordinatedStateManager getCoordinatedStateManager() {
350      return null;
351    }
352
353    @Override
354    public ServerName getServerName() {
355      return null;
356    }
357
358    @Override
359    public boolean isStopped() {
360      return this.stopped;
361    }
362
363    @Override
364    public void stop(String why) {
365      this.stopped = true;
366    }
367
368    @Override
369    public ClusterConnection getConnection() {
370      return null;
371    }
372
373    public ClusterStatusTracker getClusterStatusTracker() {
374      return clusterStatusTracker;
375    }
376
377    public ActiveMasterManager getActiveMasterManager() {
378      return activeMasterManager;
379    }
380
381    @Override
382    public ChoreService getChoreService() {
383      return null;
384    }
385
386    @Override
387    public ClusterConnection getClusterConnection() {
388      // TODO Auto-generated method stub
389      return null;
390    }
391
392    @Override
393    public FileSystem getFileSystem() {
394      return null;
395    }
396
397    @Override
398    public boolean isStopping() {
399      return false;
400    }
401
402    @Override
403    public Connection createConnection(Configuration conf) throws IOException {
404      return null;
405    }
406  }
407}