001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.Matchers.allOf; 022import static org.hamcrest.Matchers.hasItem; 023import static org.hamcrest.Matchers.is; 024import static org.junit.Assert.*; 025 026import java.io.IOException; 027import java.io.StringWriter; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.concurrent.ScheduledThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.LocalHBaseCluster; 038import org.apache.hadoop.hbase.MatcherPredicate; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 041import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; 042import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.master.ServerManager; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; 048import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 049import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.zookeeper.KeeperException; 052import org.junit.After; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062@Category(LargeTests.class) 063public class TestRegionServerReportForDuty { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); 070 071 private static final long SLEEP_INTERVAL = 500; 072 073 private HBaseTestingUtil testUtil; 074 private LocalHBaseCluster cluster; 075 private RegionServerThread rs; 076 private RegionServerThread rs2; 077 private MasterThread master; 078 private MasterThread backupMaster; 079 080 @Before 081 public void setUp() throws Exception { 082 testUtil = new HBaseTestingUtil(); 083 testUtil.startMiniDFSCluster(1); 084 testUtil.startMiniZKCluster(1); 085 testUtil.createRootDir(); 086 cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); 087 } 088 089 @After 090 public void tearDown() throws Exception { 091 cluster.shutdown(); 092 cluster.join(); 093 testUtil.shutdownMiniZKCluster(); 094 testUtil.shutdownMiniDFSCluster(); 095 } 096 097 private static class LogCapturer { 098 private StringWriter sw = new StringWriter(); 099 private org.apache.logging.log4j.core.appender.WriterAppender appender; 100 private org.apache.logging.log4j.core.Logger logger; 101 102 LogCapturer(org.apache.logging.log4j.core.Logger logger) { 103 this.logger = logger; 104 this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder() 105 .setName("test").setTarget(sw).build(); 106 this.logger.addAppender(this.appender); 107 } 108 109 String getOutput() { 110 return sw.toString(); 111 } 112 113 public void stopCapturing() { 114 this.logger.removeAppender(this.appender); 115 } 116 } 117 118 /** 119 * This test HMaster class will always throw ServerNotRunningYetException if checked. 120 */ 121 public static class NeverInitializedMaster extends HMaster { 122 public NeverInitializedMaster(Configuration conf) throws IOException { 123 super(conf); 124 } 125 126 @Override 127 protected void checkServiceStarted() throws ServerNotRunningYetException { 128 throw new ServerNotRunningYetException("Server is not running yet"); 129 } 130 } 131 132 /** 133 * Tests region server should backoff to report for duty if master is not ready. 134 */ 135 @Test 136 public void testReportForDutyBackoff() throws IOException, InterruptedException { 137 cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); 138 master = cluster.addMaster(); 139 master.start(); 140 141 LogCapturer capturer = 142 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 143 .getLogger(HRegionServer.class)); 144 // Set sleep interval relatively low so that exponential backoff is more demanding. 145 int msginterval = 100; 146 cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); 147 rs = cluster.addRegionServer(); 148 rs.start(); 149 150 int interval = 10_000; 151 Thread.sleep(interval); 152 capturer.stopCapturing(); 153 String output = capturer.getOutput(); 154 LOG.info("{}", output); 155 String failMsg = "reportForDuty failed;"; 156 int count = StringUtils.countMatches(output, failMsg); 157 158 // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). 159 // Ideally we can assert the exact retry count. We relax here to tolerate contention error. 160 int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval)); 161 assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count, 162 expectedRetry / 2), count > expectedRetry / 2); 163 assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count, 164 expectedRetry * 2), count < expectedRetry * 2); 165 } 166 167 /** 168 * Tests region sever reportForDuty with backup master becomes primary master after the first 169 * master goes away. 170 */ 171 @Test 172 public void testReportForDutyWithMasterChange() throws Exception { 173 174 // Start a master and wait for it to become the active/primary master. 175 // Use a random unique port 176 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 177 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 178 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 179 master = cluster.addMaster(); 180 rs = cluster.addRegionServer(); 181 LOG.debug("Starting master: " + master.getMaster().getServerName()); 182 master.start(); 183 rs.start(); 184 185 waitForClusterOnline(master); 186 187 // Add a 2nd region server 188 cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 189 rs2 = cluster.addRegionServer(); 190 // Start the region server. This region server will refresh RPC connection 191 // from the current active master to the next active master before completing 192 // reportForDuty 193 LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); 194 rs2.start(); 195 196 waitForSecondRsStarted(); 197 198 // Stop the current master. 199 master.getMaster().stop("Stopping master"); 200 201 // Start a new master and use another random unique port 202 // Also let it wait for exactly 2 region severs to report in. 203 // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! 204 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 205 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2); 206 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2); 207 backupMaster = cluster.addMaster(); 208 LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); 209 backupMaster.start(); 210 211 waitForClusterOnline(backupMaster); 212 213 // Do some checking/asserts here. 214 assertTrue(backupMaster.getMaster().isActiveMaster()); 215 assertTrue(backupMaster.getMaster().isInitialized()); 216 assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2); 217 218 } 219 220 /** 221 * Tests region sever reportForDuty with RS RPC retry 222 */ 223 @Test 224 public void testReportForDutyWithRSRpcRetry() throws Exception { 225 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, 226 new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) 227 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 228 229 // Start a master and wait for it to become the active/primary master. 230 // Use a random unique port 231 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 232 // Override the default RS RPC retry interval of 100ms to 300ms 233 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); 234 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 235 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 236 master = cluster.addMaster(); 237 rs = cluster.addRegionServer(); 238 LOG.debug("Starting master: " + master.getMaster().getServerName()); 239 master.start(); 240 // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block 241 scheduledThreadPoolExecutor.schedule(new Runnable() { 242 @Override 243 public void run() { 244 rs.start(); 245 } 246 }, 1000, TimeUnit.MILLISECONDS); 247 248 waitForClusterOnline(master); 249 } 250 251 /** 252 * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is 253 * configured to reject decommissioned hosts and when there is a match for the joining 254 * RegionServer in the list of decommissioned servers. Test case for HBASE-28342. 255 */ 256 @Test 257 public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts() 258 throws Exception { 259 // Start a master and wait for it to become the active/primary master. 260 // Use a random unique port 261 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 262 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 263 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 264 265 // Set the cluster to reject decommissioned hosts 266 cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true); 267 268 master = cluster.addMaster(); 269 rs = cluster.addRegionServer(); 270 master.start(); 271 rs.start(); 272 waitForClusterOnline(master); 273 274 // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty 275 LogCapturer capturer = 276 new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager 277 .getLogger(HRegionServer.class)); 278 279 rs2 = cluster.addRegionServer(); 280 master.getMaster().decommissionRegionServers( 281 Collections.singletonList(rs2.getRegionServer().getServerName()), false); 282 rs2.start(); 283 284 // Assert that the second regionserver has aborted 285 testUtil.waitFor(TimeUnit.SECONDS.toMillis(90), 286 new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true))); 287 288 // Assert that the log messages for DecommissionedHostRejectedException exist in the logs 289 capturer.stopCapturing(); 290 291 assertThat(capturer.getOutput(), 292 containsString("Master rejected startup because the host is considered decommissioned")); 293 294 /** 295 * Assert that the following log message occurred (one line): 296 * "org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: 297 * org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException: Host localhost exists in the 298 * list of decommissioned servers and Master is configured to reject decommissioned hosts" 299 */ 300 assertThat(Arrays.asList(capturer.getOutput().split("\n")), 301 hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()), 302 containsString(DecommissionedHostRejectedException.class.getSimpleName()), 303 containsString("Host " + rs2.getRegionServer().getServerName().getHostname() 304 + " exists in the list of decommissioned servers and Master is configured to reject" 305 + " decommissioned hosts")))); 306 307 assertThat(Arrays.asList(capturer.getOutput().split("\n")), 308 hasItem( 309 allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()), 310 containsString("Unhandled"), 311 containsString(DecommissionedHostRejectedException.class.getSimpleName()), 312 containsString("Host " + rs2.getRegionServer().getServerName().getHostname() 313 + " exists in the list of decommissioned servers and Master is configured to reject" 314 + " decommissioned hosts")))); 315 } 316 317 /** 318 * Tests region sever reportForDuty with a non-default environment edge 319 */ 320 @Test 321 public void testReportForDutyWithEnvironmentEdge() throws Exception { 322 // Start a master and wait for it to become the active/primary master. 323 // Use a random unique port 324 cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); 325 // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately 326 cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); 327 cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); 328 329 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); 330 cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); 331 332 // Inject non-default environment edge 333 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); 334 EnvironmentEdgeManager.injectEdge(edge); 335 master = cluster.addMaster(); 336 rs = cluster.addRegionServer(); 337 LOG.debug("Starting master: " + master.getMaster().getServerName()); 338 master.start(); 339 rs.start(); 340 waitForClusterOnline(master); 341 } 342 343 private void waitForClusterOnline(MasterThread master) throws InterruptedException { 344 while (true) { 345 if (master.getMaster().isInitialized()) { 346 break; 347 } 348 Thread.sleep(SLEEP_INTERVAL); 349 LOG.debug("Waiting for master to come online ..."); 350 } 351 rs.waitForServerOnline(); 352 } 353 354 private void waitForSecondRsStarted() throws InterruptedException { 355 while (true) { 356 if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { 357 break; 358 } 359 Thread.sleep(SLEEP_INTERVAL); 360 LOG.debug("Waiting 2nd RS to be started ..."); 361 } 362 } 363 364 // Create a Region Server that provide a hook so that we can wait for the master switch over 365 // before continuing reportForDuty to the mater. 366 // The idea is that we get a RPC connection to the first active master, then we wait. 367 // The first master goes down, the second master becomes the active master. The region 368 // server continues reportForDuty. It should succeed with the new master. 369 public static class MyRegionServer extends MiniHBaseClusterRegionServer { 370 371 private ServerName sn; 372 // This flag is to make sure this rs has obtained the rpcStub to the first master. 373 // The first master will go down after this. 374 private boolean rpcStubCreatedFlag = false; 375 private boolean masterChanged = false; 376 377 public MyRegionServer(Configuration conf) 378 throws IOException, KeeperException, InterruptedException { 379 super(conf); 380 } 381 382 @Override 383 protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { 384 sn = super.createRegionServerStatusStub(refresh); 385 rpcStubCreatedFlag = true; 386 387 // Wait for master switch over. Only do this for the second region server. 388 while (!masterChanged) { 389 ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); 390 if (newSn != null && !newSn.equals(sn)) { 391 masterChanged = true; 392 break; 393 } 394 try { 395 Thread.sleep(SLEEP_INTERVAL); 396 } catch (InterruptedException e) { 397 return null; 398 } 399 LOG.debug("Waiting for master switch over ... "); 400 } 401 return sn; 402 } 403 404 public boolean getRpcStubCreatedFlag() { 405 return rpcStubCreatedFlag; 406 } 407 } 408}