001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.Matchers.allOf;
022import static org.hamcrest.Matchers.hasItem;
023import static org.hamcrest.Matchers.is;
024import static org.junit.Assert.*;
025
026import java.io.IOException;
027import java.io.StringWriter;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.concurrent.ScheduledThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.LocalHBaseCluster;
038import org.apache.hadoop.hbase.MatcherPredicate;
039import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException;
042import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.master.LoadBalancer;
045import org.apache.hadoop.hbase.master.ServerManager;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
049import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
051import org.apache.hadoop.hbase.util.Threads;
052import org.apache.zookeeper.KeeperException;
053import org.junit.After;
054import org.junit.Before;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062
063@Category(LargeTests.class)
064public class TestRegionServerReportForDuty {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
071
072  private static final long SLEEP_INTERVAL = 500;
073
074  private HBaseTestingUtility testUtil;
075  private LocalHBaseCluster cluster;
076  private RegionServerThread rs;
077  private RegionServerThread rs2;
078  private MasterThread master;
079  private MasterThread backupMaster;
080
081  @Before
082  public void setUp() throws Exception {
083    testUtil = new HBaseTestingUtility();
084    testUtil.startMiniDFSCluster(1);
085    testUtil.startMiniZKCluster(1);
086    testUtil.createRootDir();
087    cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
088  }
089
090  @After
091  public void tearDown() throws Exception {
092    cluster.shutdown();
093    cluster.join();
094    testUtil.shutdownMiniZKCluster();
095    testUtil.shutdownMiniDFSCluster();
096  }
097
098  private static class LogCapturer {
099    private StringWriter sw = new StringWriter();
100    private org.apache.logging.log4j.core.appender.WriterAppender appender;
101    private org.apache.logging.log4j.core.Logger logger;
102
103    LogCapturer(org.apache.logging.log4j.core.Logger logger) {
104      this.logger = logger;
105      this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder()
106        .setName("test").setTarget(sw).build();
107      this.logger.addAppender(this.appender);
108    }
109
110    String getOutput() {
111      return sw.toString();
112    }
113
114    public void stopCapturing() {
115      this.logger.removeAppender(this.appender);
116    }
117  }
118
119  /**
120   * This test HMaster class will always throw ServerNotRunningYetException if checked.
121   */
122  public static class NeverInitializedMaster extends HMaster {
123    public NeverInitializedMaster(Configuration conf) throws IOException {
124      super(conf);
125    }
126
127    @Override
128    protected void checkServiceStarted() throws ServerNotRunningYetException {
129      throw new ServerNotRunningYetException("Server is not running yet");
130    }
131  }
132
133  /**
134   * Tests region server should backoff to report for duty if master is not ready.
135   */
136  @Test
137  public void testReportForDutyBackoff() throws IOException, InterruptedException {
138    cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
139    master = cluster.addMaster();
140    master.start();
141
142    LogCapturer capturer =
143      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
144        .getLogger(HRegionServer.class));
145    // Set sleep interval relatively low so that exponential backoff is more demanding.
146    int msginterval = 100;
147    cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
148    rs = cluster.addRegionServer();
149    rs.start();
150
151    int interval = 10_000;
152    Thread.sleep(interval);
153    capturer.stopCapturing();
154    String output = capturer.getOutput();
155    LOG.info("{}", output);
156    String failMsg = "reportForDuty failed;";
157    int count = StringUtils.countMatches(output, failMsg);
158
159    // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
160    // Ideally we can assert the exact retry count. We relax here to tolerate contention error.
161    int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval));
162    assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count,
163      expectedRetry / 2), count > expectedRetry / 2);
164    assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count,
165      expectedRetry * 2), count < expectedRetry * 2);
166  }
167
168  /**
169   * Tests region sever reportForDuty with backup master becomes primary master after the first
170   * master goes away.
171   */
172  @Test
173  public void testReportForDutyWithMasterChange() throws Exception {
174
175    // Start a master and wait for it to become the active/primary master.
176    // Use a random unique port
177    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
178    // master has a rs. defaultMinToStart = 2
179    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
180    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
181      tablesOnMaster ? 2 : 1);
182    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
183      tablesOnMaster ? 2 : 1);
184    master = cluster.addMaster();
185    rs = cluster.addRegionServer();
186    LOG.debug("Starting master: " + master.getMaster().getServerName());
187    master.start();
188    rs.start();
189
190    waitForClusterOnline(master);
191
192    // Add a 2nd region server
193    cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
194    rs2 = cluster.addRegionServer();
195    // Start the region server. This region server will refresh RPC connection
196    // from the current active master to the next active master before completing
197    // reportForDuty
198    LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
199    rs2.start();
200
201    waitForSecondRsStarted();
202
203    // Stop the current master.
204    master.getMaster().stop("Stopping master");
205
206    // Start a new master and use another random unique port
207    // Also let it wait for exactly 2 region severs to report in.
208    // TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
209    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
210    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
211      tablesOnMaster ? 3 : 2);
212    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
213      tablesOnMaster ? 3 : 2);
214    backupMaster = cluster.addMaster();
215    LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
216    backupMaster.start();
217
218    waitForClusterOnline(backupMaster);
219
220    // Do some checking/asserts here.
221    assertTrue(backupMaster.getMaster().isActiveMaster());
222    assertTrue(backupMaster.getMaster().isInitialized());
223    assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(),
224      tablesOnMaster ? 3 : 2);
225
226  }
227
228  /**
229   * Tests region sever reportForDuty with RS RPC retry
230   */
231  @Test
232  public void testReportForDutyWithRSRpcRetry() throws Exception {
233    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
234      new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
235        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
236
237    // Start a master and wait for it to become the active/primary master.
238    // Use a random unique port
239    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
240    // Override the default RS RPC retry interval of 100ms to 300ms
241    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
242    // master has a rs. defaultMinToStart = 2
243    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
244    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
245      tablesOnMaster ? 2 : 1);
246    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
247      tablesOnMaster ? 2 : 1);
248    master = cluster.addMaster();
249    rs = cluster.addRegionServer();
250    LOG.debug("Starting master: " + master.getMaster().getServerName());
251    master.start();
252    // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
253    scheduledThreadPoolExecutor.schedule(new Runnable() {
254      @Override
255      public void run() {
256        rs.start();
257      }
258    }, 1000, TimeUnit.MILLISECONDS);
259
260    waitForClusterOnline(master);
261  }
262
263  /**
264   * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
265   * configured to reject decommissioned hosts and when there is a match for the joining
266   * RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
267   */
268  @Test
269  public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
270    throws Exception {
271    // Start a master and wait for it to become the active/primary master.
272    // Use a random unique port
273    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
274    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
275    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
276
277    // Set the cluster to reject decommissioned hosts
278    cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);
279
280    master = cluster.addMaster();
281    rs = cluster.addRegionServer();
282    master.start();
283    rs.start();
284    waitForClusterOnline(master);
285
286    // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
287    LogCapturer capturer =
288      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
289        .getLogger(HRegionServer.class));
290
291    rs2 = cluster.addRegionServer();
292    master.getMaster().decommissionRegionServers(
293      Collections.singletonList(rs2.getRegionServer().getServerName()), false);
294    rs2.start();
295
296    // Assert that the second regionserver has aborted
297    testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
298      new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));
299
300    // Assert that the log messages for DecommissionedHostRejectedException exist in the logs
301    capturer.stopCapturing();
302
303    assertThat(capturer.getOutput(),
304      containsString("Master rejected startup because the host is considered decommissioned"));
305
306    /**
307     * Assert that the following log message occurred (one line):
308     * "org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException:
309     * org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: Host localhost exists in the
310     * list of decommissioned servers and Master is configured to reject decommissioned hosts"
311     */
312    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
313      hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
314        containsString(DecommissionedHostRejectedException.class.getSimpleName()),
315        containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
316          + " exists in the list of decommissioned servers and Master is configured to reject"
317          + " decommissioned hosts"))));
318
319    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
320      hasItem(
321        allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
322          containsString("Unhandled"),
323          containsString(DecommissionedHostRejectedException.class.getSimpleName()),
324          containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
325            + " exists in the list of decommissioned servers and Master is configured to reject"
326            + " decommissioned hosts"))));
327  }
328
329  /**
330   * Tests region sever reportForDuty with a non-default environment edge
331   */
332  @Test
333  public void testReportForDutyWithEnvironmentEdge() throws Exception {
334    // Start a master and wait for it to become the active/primary master.
335    // Use a random unique port
336    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort());
337    // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
338    cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
339    cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
340
341    // master has a rs. defaultMinToStart = 2
342    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration());
343    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
344      tablesOnMaster ? 2 : 1);
345    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART,
346      tablesOnMaster ? 2 : 1);
347
348    // Inject non-default environment edge
349    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
350    EnvironmentEdgeManager.injectEdge(edge);
351    master = cluster.addMaster();
352    rs = cluster.addRegionServer();
353    LOG.debug("Starting master: " + master.getMaster().getServerName());
354    master.start();
355    rs.start();
356    waitForClusterOnline(master);
357  }
358
359  private void waitForClusterOnline(MasterThread master) throws InterruptedException {
360    while (true) {
361      if (master.getMaster().isInitialized()) {
362        break;
363      }
364      Thread.sleep(SLEEP_INTERVAL);
365      LOG.debug("Waiting for master to come online ...");
366    }
367    rs.waitForServerOnline();
368  }
369
370  private void waitForSecondRsStarted() throws InterruptedException {
371    while (true) {
372      if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
373        break;
374      }
375      Thread.sleep(SLEEP_INTERVAL);
376      LOG.debug("Waiting 2nd RS to be started ...");
377    }
378  }
379
380  // Create a Region Server that provide a hook so that we can wait for the master switch over
381  // before continuing reportForDuty to the mater.
382  // The idea is that we get a RPC connection to the first active master, then we wait.
383  // The first master goes down, the second master becomes the active master. The region
384  // server continues reportForDuty. It should succeed with the new master.
385  public static class MyRegionServer extends MiniHBaseClusterRegionServer {
386
387    private ServerName sn;
388    // This flag is to make sure this rs has obtained the rpcStub to the first master.
389    // The first master will go down after this.
390    private boolean rpcStubCreatedFlag = false;
391    private boolean masterChanged = false;
392
393    public MyRegionServer(Configuration conf)
394      throws IOException, KeeperException, InterruptedException {
395      super(conf);
396    }
397
398    @Override
399    protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
400      sn = super.createRegionServerStatusStub(refresh);
401      rpcStubCreatedFlag = true;
402
403      // Wait for master switch over. Only do this for the second region server.
404      while (!masterChanged) {
405        ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
406        if (newSn != null && !newSn.equals(sn)) {
407          masterChanged = true;
408          break;
409        }
410        try {
411          Thread.sleep(SLEEP_INTERVAL);
412        } catch (InterruptedException e) {
413          return null;
414        }
415        LOG.debug("Waiting for master switch over ... ");
416      }
417      return sn;
418    }
419
420    public boolean getRpcStubCreatedFlag() {
421      return rpcStubCreatedFlag;
422    }
423  }
424}