001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT; 021import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT_WAIT_INTERVAL; 022import static org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertNotEquals; 025import static org.junit.Assert.assertTrue; 026 027import java.io.IOException; 028import java.io.UncheckedIOException; 029import java.util.List; 030import java.util.Map; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.MiniHBaseCluster; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.StartMiniClusterOption; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.testclassification.MasterTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.util.JVMClusterUtil; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048@Category({ MasterTests.class, MediumTests.class }) 049public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestRetainAssignmentOnRestart.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class); 056 057 private static int NUM_OF_RS = 3; 058 059 public static final class HMasterForTest extends HMaster { 060 061 public HMasterForTest(Configuration conf) throws IOException { 062 super(conf); 063 } 064 065 @Override 066 protected void startProcedureExecutor() throws IOException { 067 // only start procedure executor when we have all the regionservers ready to take regions 068 new Thread(() -> { 069 for (;;) { 070 if (getServerManager().createDestinationServersList().size() == NUM_OF_RS) { 071 try { 072 HMasterForTest.super.startProcedureExecutor(); 073 } catch (IOException e) { 074 throw new UncheckedIOException(e); 075 } 076 break; 077 } 078 try { 079 Thread.sleep(1000); 080 } catch (InterruptedException e) { 081 } 082 } 083 }).start(); 084 } 085 } 086 087 @Override 088 protected boolean splitWALCoordinatedByZk() { 089 return true; 090 } 091 092 /** 093 * This tests retaining assignments on a cluster restart 094 */ 095 @Test 096 public void testRetainAssignmentOnClusterRestart() throws Exception { 097 setupCluster(); 098 HMaster master = UTIL.getMiniHBaseCluster().getMaster(); 099 MiniHBaseCluster cluster = UTIL.getHBaseCluster(); 100 List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads(); 101 assertEquals(NUM_OF_RS, threads.size()); 102 int[] rsPorts = new int[NUM_OF_RS]; 103 for (int i = 0; i < NUM_OF_RS; i++) { 104 rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort(); 105 } 106 107 // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to 108 // use it to load all user region placements 109 SnapshotOfRegionAssignmentFromMeta snapshot = 110 new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 111 snapshot.initialize(); 112 Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 113 for (ServerName serverName : regionToRegionServerMap.values()) { 114 boolean found = false; // Test only, no need to optimize 115 for (int k = 0; k < NUM_OF_RS && !found; k++) { 116 found = serverName.getPort() == rsPorts[k]; 117 } 118 assertTrue(found); 119 } 120 121 LOG.info("\n\nShutting down HBase cluster"); 122 cluster.stopMaster(0); 123 cluster.shutdown(); 124 cluster.waitUntilShutDown(); 125 126 LOG.info("\n\nSleeping a bit"); 127 Thread.sleep(2000); 128 129 LOG.info("\n\nStarting cluster the second time with the same ports"); 130 cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); 131 master = cluster.startMaster().getMaster(); 132 for (int i = 0; i < NUM_OF_RS; i++) { 133 cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]); 134 cluster.startRegionServer(); 135 } 136 137 ensureServersWithSamePort(master, rsPorts); 138 139 // Wait till master is initialized and all regions are assigned 140 for (TableName TABLE : TABLES) { 141 UTIL.waitTableAvailable(TABLE); 142 } 143 UTIL.waitUntilNoRegionsInTransition(60000); 144 145 snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 146 snapshot.initialize(); 147 Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 148 assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size()); 149 for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) { 150 ServerName oldServer = regionToRegionServerMap.get(entry.getKey()); 151 ServerName currentServer = entry.getValue(); 152 LOG.info( 153 "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer); 154 assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress()); 155 assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); 156 } 157 } 158 159 /** 160 * This tests retaining assignments on a single node restart 161 */ 162 @Test 163 public void testRetainAssignmentOnSingleRSRestart() throws Exception { 164 setupCluster(); 165 HMaster master = UTIL.getMiniHBaseCluster().getMaster(); 166 MiniHBaseCluster cluster = UTIL.getHBaseCluster(); 167 List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads(); 168 assertEquals(NUM_OF_RS, threads.size()); 169 int[] rsPorts = new int[NUM_OF_RS]; 170 for (int i = 0; i < NUM_OF_RS; i++) { 171 rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort(); 172 } 173 174 // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to 175 // use it to load all user region placements 176 SnapshotOfRegionAssignmentFromMeta snapshot = 177 new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 178 snapshot.initialize(); 179 Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 180 for (ServerName serverName : regionToRegionServerMap.values()) { 181 boolean found = false; // Test only, no need to optimize 182 for (int k = 0; k < NUM_OF_RS && !found; k++) { 183 found = serverName.getPort() == rsPorts[k]; 184 } 185 assertTrue(found); 186 } 187 188 // Server to be restarted 189 ServerName deadRS = threads.get(0).getRegionServer().getServerName(); 190 LOG.info("\n\nStopping HMaster and {} server", deadRS); 191 // Stopping master first so that region server SCP will not be initiated 192 cluster.stopMaster(0); 193 cluster.waitForMasterToStop(master.getServerName(), 5000); 194 cluster.stopRegionServer(deadRS); 195 cluster.waitForRegionServerToStop(deadRS, 5000); 196 197 LOG.info("\n\nSleeping a bit"); 198 Thread.sleep(2000); 199 200 LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS); 201 cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); 202 master = cluster.startMaster().getMaster(); 203 cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort()); 204 cluster.startRegionServer(); 205 206 ensureServersWithSamePort(master, rsPorts); 207 208 // Wait till master is initialized and all regions are assigned 209 for (TableName TABLE : TABLES) { 210 UTIL.waitTableAvailable(TABLE); 211 } 212 UTIL.waitUntilNoRegionsInTransition(60000); 213 214 snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 215 snapshot.initialize(); 216 Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 217 assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size()); 218 for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) { 219 ServerName oldServer = regionToRegionServerMap.get(entry.getKey()); 220 ServerName currentServer = entry.getValue(); 221 LOG.info( 222 "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer); 223 assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress()); 224 225 if (deadRS.getPort() == oldServer.getPort()) { 226 // Restarted RS start code wont be same 227 assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); 228 } else { 229 assertEquals(oldServer.getStartcode(), currentServer.getStartcode()); 230 } 231 } 232 } 233 234 /** 235 * This tests the force retaining assignments upon an RS restart, even when master triggers an SCP 236 */ 237 @Test 238 public void testForceRetainAssignment() throws Exception { 239 UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true); 240 UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 50); 241 setupCluster(); 242 HMaster master = UTIL.getMiniHBaseCluster().getMaster(); 243 MiniHBaseCluster cluster = UTIL.getHBaseCluster(); 244 List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads(); 245 assertEquals(NUM_OF_RS, threads.size()); 246 int[] rsPorts = new int[NUM_OF_RS]; 247 for (int i = 0; i < NUM_OF_RS; i++) { 248 rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort(); 249 } 250 251 // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to 252 // use it to load all user region placements 253 SnapshotOfRegionAssignmentFromMeta snapshot = 254 new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 255 snapshot.initialize(); 256 Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 257 for (ServerName serverName : regionToRegionServerMap.values()) { 258 boolean found = false; // Test only, no need to optimize 259 for (int k = 0; k < NUM_OF_RS && !found; k++) { 260 found = serverName.getPort() == rsPorts[k]; 261 } 262 LOG.info("Server {} has regions? {}", serverName, found); 263 assertTrue(found); 264 } 265 266 // Server to be restarted 267 ServerName deadRS = threads.get(0).getRegionServer().getServerName(); 268 LOG.info("\n\nStopping {} server", deadRS); 269 cluster.stopRegionServer(deadRS); 270 271 LOG.info("\n\nSleeping a bit"); 272 Thread.sleep(2000); 273 274 LOG.info("\n\nStarting region server {} second time with the same port", deadRS); 275 cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3); 276 cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort()); 277 cluster.startRegionServer(); 278 279 ensureServersWithSamePort(master, rsPorts); 280 281 // Wait till master is initialized and all regions are assigned 282 for (TableName TABLE : TABLES) { 283 UTIL.waitTableAvailable(TABLE); 284 } 285 UTIL.waitUntilNoRegionsInTransition(60000); 286 snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection()); 287 snapshot.initialize(); 288 Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap(); 289 assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size()); 290 for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) { 291 ServerName oldServer = regionToRegionServerMap.get(entry.getKey()); 292 ServerName currentServer = entry.getValue(); 293 LOG.info( 294 "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer); 295 assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress()); 296 297 if (deadRS.getPort() == oldServer.getPort()) { 298 // Restarted RS start code wont be same 299 assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode()); 300 } else { 301 assertEquals(oldServer.getStartcode(), currentServer.getStartcode()); 302 } 303 } 304 } 305 306 private void setupCluster() throws Exception, IOException, InterruptedException { 307 // Set Zookeeper based connection registry since we will stop master and start a new master 308 // without populating the underlying config for the connection. 309 UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 310 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 311 // Enable retain assignment during ServerCrashProcedure 312 UTIL.getConfiguration().setBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, true); 313 UTIL.startMiniCluster(StartMiniClusterOption.builder().masterClass(HMasterForTest.class) 314 .numRegionServers(NUM_OF_RS).build()); 315 316 // Turn off balancer 317 UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false); 318 319 LOG.info("\n\nCreating tables"); 320 for (TableName TABLE : TABLES) { 321 UTIL.createTable(TABLE, FAMILY); 322 } 323 for (TableName TABLE : TABLES) { 324 UTIL.waitTableEnabled(TABLE); 325 } 326 327 UTIL.getMiniHBaseCluster().getMaster(); 328 UTIL.waitUntilNoRegionsInTransition(60000); 329 } 330 331 private void ensureServersWithSamePort(HMaster master, int[] rsPorts) { 332 // Make sure live regionservers are on the same host/port 333 List<ServerName> localServers = master.getServerManager().getOnlineServersList(); 334 assertEquals(NUM_OF_RS, localServers.size()); 335 for (int i = 0; i < NUM_OF_RS; i++) { 336 boolean found = false; 337 for (ServerName serverName : localServers) { 338 if (serverName.getPort() == rsPorts[i]) { 339 found = true; 340 break; 341 } 342 } 343 assertTrue(found); 344 } 345 } 346}