001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.Matchers.allOf; 022import static org.hamcrest.Matchers.hasItem; 023import static org.hamcrest.Matchers.is; 024import static org.junit.Assert.*; 025 026import java.io.IOException; 027import java.io.StringWriter; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.concurrent.ScheduledThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.LocalHBaseCluster; 038import org.apache.hadoop.hbase.MatcherPredicate; 039import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 040import org.apache.hadoop.hbase.ServerName; 041import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 042import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.master.LoadBalancer; 045import org.apache.hadoop.hbase.master.ServerManager; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 048import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 049import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 051import org.apache.hadoop.hbase.util.Threads; 052import org.apache.zookeeper.KeeperException; 053import org.junit.After; 054import org.junit.Before; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063@Category(LargeTests.class) 064public class TestRegionServerReportForDuty { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 071 072 private static final long SLEEP_INTERVAL = 500; 073 074 private HBaseTestingUtility testUtil; 075 private LocalHBaseCluster cluster; 076 private RegionServerThread rs; 077 private RegionServerThread rs2; 078 private MasterThread master; 079 private MasterThread backupMaster; 080 081 @Before 082 public void setUp() throws Exception { 083 testUtil = new HBaseTestingUtility(); 084 testUtil.startMiniDFSCluster(1); 085 testUtil.startMiniZKCluster(1); 086 testUtil.createRootDir(); 087 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 088 } 089 090 @After 091 public void tearDown() throws Exception { 092 cluster.shutdown(); 093 cluster.join(); 094 testUtil.shutdownMiniZKCluster(); 095 testUtil.shutdownMiniDFSCluster(); 096 } 097 098 private static class LogCapturer { 099 private StringWriter sw = new StringWriter(); 100 private org.apache.logging.log4j.core.appender.WriterAppender appender; 101 private org.apache.logging.log4j.core.Logger logger; 102 103 LogCapturer(org.apache.logging.log4j.core.Logger logger) { 104 this.logger = logger; 105 this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder() 106 .setName("test").setTarget(sw).build(); 107 this.logger.addAppender(this.appender); 108 } 109 110 String getOutput() { 111 return sw.toString(); 112 } 113 114 public void stopCapturing() { 115 this.logger.removeAppender(this.appender); 116 } 117 } 118 119 /** 120 * This test HMaster class will always throw ServerNotRunningYetException if checked. 121 */ 122 public static class NeverInitializedMaster extends HMaster { 123 public NeverInitializedMaster(Configuration conf) throws IOException { 124 super(conf); 125 } 126 127 @Override 128 protected void checkServiceStarted() throws ServerNotRunningYetException { 129 throw new ServerNotRunningYetException("Server is not running yet"); 130 } 131 } 132 133 /** 134 * Tests region server should backoff to report for duty if master is not ready. 135 */ 136 @Test 137 public void testReportForDutyBackoff() throws IOException, InterruptedException { 138 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 139 master = cluster.addMaster(); 140 master.start(); 141 142 LogCapturer capturer = 143 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 144 .getLogger(HRegionServer.class)); 145 // Set sleep interval relatively low so that exponential backoff is more demanding. 146 int msginterval = 100; 147 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 148 rs = cluster.addRegionServer(); 149 rs.start(); 150 151 int interval = 10_000; 152 Thread.sleep(interval); 153 capturer.stopCapturing(); 154 String output = capturer.getOutput(); 155 LOG.info("{}", output); 156 String failMsg = "reportForDuty failed;"; 157 int count = StringUtils.countMatches(output, failMsg); 158 159 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 160 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 161 int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval)); 162 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count, 163 expectedRetry / 2), count > expectedRetry / 2); 164 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count, 165 expectedRetry * 2), count < expectedRetry * 2); 166 } 167 168 /** 169 * Tests region sever reportForDuty with backup master becomes primary master after the first 170 * master goes away. 171 */ 172 @Test 173 public void testReportForDutyWithMasterChange() throws Exception { 174 175 // Start a master and wait for it to become the active/primary master. 176 // Use a random unique port 177 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 178 // master has a rs. defaultMinToStart = 2 179 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 180 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 181 tablesOnMaster ? 2 : 1); 182 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 183 tablesOnMaster ? 2 : 1); 184 master = cluster.addMaster(); 185 rs = cluster.addRegionServer(); 186 LOG.debug("Starting master: " + master.getMaster().getServerName()); 187 master.start(); 188 rs.start(); 189 190 waitForClusterOnline(master); 191 192 // Add a 2nd region server 193 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 194 rs2 = cluster.addRegionServer(); 195 // Start the region server. This region server will refresh RPC connection 196 // from the current active master to the next active master before completing 197 // reportForDuty 198 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 199 rs2.start(); 200 201 waitForSecondRsStarted(); 202 203 // Stop the current master. 204 master.getMaster().stop("Stopping master"); 205 206 // Start a new master and use another random unique port 207 // Also let it wait for exactly 2 region severs to report in. 208 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 209 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 210 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 211 tablesOnMaster ? 3 : 2); 212 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 213 tablesOnMaster ? 3 : 2); 214 backupMaster = cluster.addMaster(); 215 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 216 backupMaster.start(); 217 218 waitForClusterOnline(backupMaster); 219 220 // Do some checking/asserts here. 221 assertTrue(backupMaster.getMaster().isActiveMaster()); 222 assertTrue(backupMaster.getMaster().isInitialized()); 223 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 224 tablesOnMaster ? 3 : 2); 225 226 } 227 228 /** 229 * Tests region sever reportForDuty with RS RPC retry 230 */ 231 @Test 232 public void testReportForDutyWithRSRpcRetry() throws Exception { 233 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, 234 new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) 235 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 236 237 // Start a master and wait for it to become the active/primary master. 238 // Use a random unique port 239 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 240 // Override the default RS RPC retry interval of 100ms to 300ms 241 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 242 // master has a rs. defaultMinToStart = 2 243 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 244 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 245 tablesOnMaster ? 2 : 1); 246 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 247 tablesOnMaster ? 2 : 1); 248 master = cluster.addMaster(); 249 rs = cluster.addRegionServer(); 250 LOG.debug("Starting master: " + master.getMaster().getServerName()); 251 master.start(); 252 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 253 scheduledThreadPoolExecutor.schedule(new Runnable() { 254 @Override 255 public void run() { 256 rs.start(); 257 } 258 }, 1000, TimeUnit.MILLISECONDS); 259 260 waitForClusterOnline(master); 261 } 262 263 /** 264 * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is 265 * configured to reject decommissioned hosts and when there is a match for the joining 266 * RegionServer in the list of decommissioned servers. Test case for HBASE-28342. 267 */ 268 @Test 269 public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts() 270 throws Exception { 271 // Start a master and wait for it to become the active/primary master. 272 // Use a random unique port 273 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 274 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 275 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 276 277 // Set the cluster to reject decommissioned hosts 278 cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true); 279 280 master = cluster.addMaster(); 281 rs = cluster.addRegionServer(); 282 master.start(); 283 rs.start(); 284 waitForClusterOnline(master); 285 286 // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty 287 LogCapturer capturer = 288 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 289 .getLogger(HRegionServer.class)); 290 291 rs2 = cluster.addRegionServer(); 292 master.getMaster().decommissionRegionServers( 293 Collections.singletonList(rs2.getRegionServer().getServerName()), false); 294 rs2.start(); 295 296 // Assert that the second regionserver has aborted 297 testUtil.waitFor(TimeUnit.SECONDS.toMillis(90), 298 new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true))); 299 300 // Assert that the log messages for DecommissionedHostRejectedException exist in the logs 301 capturer.stopCapturing(); 302 303 assertThat(capturer.getOutput(), 304 containsString("Master rejected startup because the host is considered decommissioned")); 305 306 /** 307 * Assert that the following log message occurred (one line): 308 * "org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: 309 * org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: Host localhost exists in the 310 * list of decommissioned servers and Master is configured to reject decommissioned hosts" 311 */ 312 assertThat(Arrays.asList(capturer.getOutput().split("\n")), 313 hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()), 314 containsString(DecommissionedHostRejectedException.class.getSimpleName()), 315 containsString("Host " + rs2.getRegionServer().getServerName().getHostname() 316 + " exists in the list of decommissioned servers and Master is configured to reject" 317 + " decommissioned hosts")))); 318 319 assertThat(Arrays.asList(capturer.getOutput().split("\n")), 320 hasItem( 321 allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()), 322 containsString("Unhandled"), 323 containsString(DecommissionedHostRejectedException.class.getSimpleName()), 324 containsString("Host " + rs2.getRegionServer().getServerName().getHostname() 325 + " exists in the list of decommissioned servers and Master is configured to reject" 326 + " decommissioned hosts")))); 327 } 328 329 /** 330 * Tests region sever reportForDuty with a non-default environment edge 331 */ 332 @Test 333 public void testReportForDutyWithEnvironmentEdge() throws Exception { 334 // Start a master and wait for it to become the active/primary master. 335 // Use a random unique port 336 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtility.randomFreePort()); 337 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 338 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 339 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 340 341 // master has a rs. defaultMinToStart = 2 342 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(testUtil.getConfiguration()); 343 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 344 tablesOnMaster ? 2 : 1); 345 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 346 tablesOnMaster ? 2 : 1); 347 348 // Inject non-default environment edge 349 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 350 EnvironmentEdgeManager.injectEdge(edge); 351 master = cluster.addMaster(); 352 rs = cluster.addRegionServer(); 353 LOG.debug("Starting master: " + master.getMaster().getServerName()); 354 master.start(); 355 rs.start(); 356 waitForClusterOnline(master); 357 } 358 359 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 360 while (true) { 361 if (master.getMaster().isInitialized()) { 362 break; 363 } 364 Thread.sleep(SLEEP_INTERVAL); 365 LOG.debug("Waiting for master to come online ..."); 366 } 367 rs.waitForServerOnline(); 368 } 369 370 private void waitForSecondRsStarted() throws InterruptedException { 371 while (true) { 372 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 373 break; 374 } 375 Thread.sleep(SLEEP_INTERVAL); 376 LOG.debug("Waiting 2nd RS to be started ..."); 377 } 378 } 379 380 // Create a Region Server that provide a hook so that we can wait for the master switch over 381 // before continuing reportForDuty to the mater. 382 // The idea is that we get a RPC connection to the first active master, then we wait. 383 // The first master goes down, the second master becomes the active master. The region 384 // server continues reportForDuty. It should succeed with the new master. 385 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 386 387 private ServerName sn; 388 // This flag is to make sure this rs has obtained the rpcStub to the first master. 389 // The first master will go down after this. 390 private boolean rpcStubCreatedFlag = false; 391 private boolean masterChanged = false; 392 393 public MyRegionServer(Configuration conf) 394 throws IOException, KeeperException, InterruptedException { 395 super(conf); 396 } 397 398 @Override 399 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 400 sn = super.createRegionServerStatusStub(refresh); 401 rpcStubCreatedFlag = true; 402 403 // Wait for master switch over. Only do this for the second region server. 404 while (!masterChanged) { 405 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 406 if (newSn != null && !newSn.equals(sn)) { 407 masterChanged = true; 408 break; 409 } 410 try { 411 Thread.sleep(SLEEP_INTERVAL); 412 } catch (InterruptedException e) { 413 return null; 414 } 415 LOG.debug("Waiting for master switch over ... "); 416 } 417 return sn; 418 } 419 420 public boolean getRpcStubCreatedFlag() { 421 return rpcStubCreatedFlag; 422 } 423 } 424}