001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.Arrays; 024import java.util.Collection; 025import java.util.List; 026import java.util.NavigableSet; 027import java.util.Set; 028import java.util.TreeSet; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 036import org.apache.hadoop.hbase.StartTestingClusterOption; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.client.RegionLocator; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MasterTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 047import org.junit.ClassRule; 048import org.junit.Rule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.rules.TestName; 052import org.junit.runner.RunWith; 053import org.junit.runners.Parameterized; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 058 059/** 060 * Tests the restarting of everything as done during rolling restarts. 061 */ 062@RunWith(Parameterized.class) 063@Category({ MasterTests.class, LargeTests.class }) 064public class TestRollingRestart { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestRollingRestart.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class); 071 072 private static HBaseTestingUtil TEST_UTIL; 073 @Rule 074 public TestName name = new TestName(); 075 076 @Parameterized.Parameter 077 public boolean splitWALCoordinatedByZK; 078 079 @Test 080 public void testBasicRollingRestart() throws Exception { 081 082 // Start a cluster with 2 masters and 4 regionservers 083 final int NUM_MASTERS = 2; 084 final int NUM_RS = 3; 085 final int NUM_REGIONS_TO_CREATE = 20; 086 087 int expectedNumRS = 3; 088 089 // Start the cluster 090 log("Starting cluster"); 091 Configuration conf = HBaseConfiguration.create(); 092 conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK); 093 TEST_UTIL = new HBaseTestingUtil(conf); 094 StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(NUM_MASTERS) 095 .numRegionServers(NUM_RS).numDataNodes(NUM_RS).build(); 096 TEST_UTIL.startMiniCluster(option); 097 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 098 log("Waiting for active/ready master"); 099 cluster.waitForActiveAndReadyMaster(); 100 101 // Create a table with regions 102 final TableName tableName = 103 TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-")); 104 byte[] family = Bytes.toBytes("family"); 105 log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions"); 106 Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE); 107 int numRegions = -1; 108 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { 109 numRegions = r.getStartKeys().length; 110 } 111 numRegions += 1; // catalogs 112 log("Waiting for no more RIT\n"); 113 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 114 log("Disabling table\n"); 115 TEST_UTIL.getAdmin().disableTable(tableName); 116 log("Waiting for no more RIT\n"); 117 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 118 NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 119 log("Verifying only catalog region is assigned\n"); 120 if (regions.size() != 1) { 121 for (String oregion : regions) { 122 log("Region still online: " + oregion); 123 } 124 } 125 assertEquals(1, regions.size()); 126 log("Enabling table\n"); 127 TEST_UTIL.getAdmin().enableTable(tableName); 128 log("Waiting for no more RIT\n"); 129 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 130 log("Verifying there are " + numRegions + " assigned on cluster\n"); 131 regions = HBaseTestingUtil.getAllOnlineRegions(cluster); 132 assertRegionsAssigned(cluster, regions); 133 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 134 135 // Add a new regionserver 136 log("Adding a fourth RS"); 137 RegionServerThread restarted = cluster.startRegionServer(); 138 expectedNumRS++; 139 restarted.waitForServerOnline(); 140 log("Additional RS is online"); 141 log("Waiting for no more RIT"); 142 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 143 log("Verifying there are " + numRegions + " assigned on cluster"); 144 assertRegionsAssigned(cluster, regions); 145 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 146 147 // Master Restarts 148 List<MasterThread> masterThreads = cluster.getMasterThreads(); 149 MasterThread activeMaster = null; 150 MasterThread backupMaster = null; 151 assertEquals(2, masterThreads.size()); 152 if (masterThreads.get(0).getMaster().isActiveMaster()) { 153 activeMaster = masterThreads.get(0); 154 backupMaster = masterThreads.get(1); 155 } else { 156 activeMaster = masterThreads.get(1); 157 backupMaster = masterThreads.get(0); 158 } 159 160 // Bring down the backup master 161 log("Stopping backup master\n\n"); 162 backupMaster.getMaster().stop("Stop of backup during rolling restart"); 163 cluster.hbaseCluster.waitOnMaster(backupMaster); 164 165 // Bring down the primary master 166 log("Stopping primary master\n\n"); 167 activeMaster.getMaster().stop("Stop of active during rolling restart"); 168 cluster.hbaseCluster.waitOnMaster(activeMaster); 169 170 // Start primary master 171 log("Restarting primary master\n\n"); 172 activeMaster = cluster.startMaster(); 173 cluster.waitForActiveAndReadyMaster(); 174 175 // Start backup master 176 log("Restarting backup master\n\n"); 177 backupMaster = cluster.startMaster(); 178 179 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 180 181 // RegionServer Restarts 182 183 // Bring them down, one at a time, waiting between each to complete 184 List<RegionServerThread> regionServers = cluster.getLiveRegionServerThreads(); 185 int num = 1; 186 int total = regionServers.size(); 187 for (RegionServerThread rst : regionServers) { 188 ServerName serverName = rst.getRegionServer().getServerName(); 189 log("Stopping region server " + num + " of " + total + " [ " + serverName + "]"); 190 rst.getRegionServer().stop("Stopping RS during rolling restart"); 191 cluster.hbaseCluster.waitOnRegionServer(rst); 192 log("Waiting for RS shutdown to be handled by master"); 193 waitForRSShutdownToStartAndFinish(activeMaster, serverName); 194 log("RS shutdown done, waiting for no more RIT"); 195 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 196 log("Verifying there are " + numRegions + " assigned on cluster"); 197 assertRegionsAssigned(cluster, regions); 198 expectedNumRS--; 199 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 200 log("Restarting region server " + num + " of " + total); 201 restarted = cluster.startRegionServer(); 202 restarted.waitForServerOnline(); 203 expectedNumRS++; 204 log("Region server " + num + " is back online"); 205 log("Waiting for no more RIT"); 206 TEST_UTIL.waitUntilNoRegionsInTransition(60000); 207 log("Verifying there are " + numRegions + " assigned on cluster"); 208 assertRegionsAssigned(cluster, regions); 209 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size()); 210 num++; 211 } 212 Thread.sleep(1000); 213 assertRegionsAssigned(cluster, regions); 214 215 // TODO: Bring random 3 of 4 RS down at the same time 216 217 ht.close(); 218 // Stop the cluster 219 TEST_UTIL.shutdownMiniCluster(); 220 } 221 222 /** 223 * Checks if the SCP of specific dead server has been executed. 224 * @return true if the SCP of specific serverName has been executed, false if not 225 */ 226 private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOException { 227 return TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream() 228 .anyMatch(p -> p instanceof ServerCrashProcedure 229 && ((ServerCrashProcedure) p).getServerName().equals(serverName)); 230 } 231 232 private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, ServerName serverName) 233 throws InterruptedException, IOException { 234 ServerManager sm = activeMaster.getMaster().getServerManager(); 235 // First wait for it to be in dead list 236 while (!sm.getDeadServers().isDeadServer(serverName)) { 237 log("Waiting for [" + serverName + "] to be listed as dead in master"); 238 Thread.sleep(1); 239 } 240 log( 241 "Server [" + serverName + "] marked as dead, waiting for it to " + "finish dead processing"); 242 243 TEST_UTIL.waitFor(60000, () -> isDeadServerSCPExecuted(serverName)); 244 245 while (sm.areDeadServersInProgress()) { 246 log("Server [" + serverName + "] still being processed, waiting"); 247 Thread.sleep(100); 248 } 249 log("Server [" + serverName + "] done with server shutdown processing"); 250 } 251 252 private void log(String msg) { 253 LOG.debug("\n\nTRR: " + msg + "\n"); 254 } 255 256 private int getNumberOfOnlineRegions(SingleProcessHBaseCluster cluster) { 257 int numFound = 0; 258 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 259 numFound += rst.getRegionServer().getNumberOfOnlineRegions(); 260 } 261 return numFound; 262 } 263 264 private void assertRegionsAssigned(SingleProcessHBaseCluster cluster, Set<String> expectedRegions) 265 throws IOException { 266 int numFound = getNumberOfOnlineRegions(cluster); 267 if (expectedRegions.size() > numFound) { 268 log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound); 269 NavigableSet<String> foundRegions = HBaseTestingUtil.getAllOnlineRegions(cluster); 270 for (String region : expectedRegions) { 271 if (!foundRegions.contains(region)) { 272 log("Missing region: " + region); 273 } 274 } 275 assertEquals(expectedRegions.size(), numFound); 276 } else if (expectedRegions.size() < numFound) { 277 int doubled = numFound - expectedRegions.size(); 278 log("Expected to find " + expectedRegions.size() + " but found" + " " + numFound + " (" 279 + doubled + " double assignments?)"); 280 NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster); 281 for (String region : doubleRegions) { 282 log("Region is double assigned: " + region); 283 } 284 assertEquals(expectedRegions.size(), numFound); 285 } else { 286 log("Success! Found expected number of " + numFound + " regions"); 287 } 288 } 289 290 private NavigableSet<String> getDoubleAssignedRegions(SingleProcessHBaseCluster cluster) 291 throws IOException { 292 NavigableSet<String> online = new TreeSet<>(); 293 NavigableSet<String> doubled = new TreeSet<>(); 294 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { 295 for (RegionInfo region : ProtobufUtil 296 .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) { 297 if (!online.add(region.getRegionNameAsString())) { 298 doubled.add(region.getRegionNameAsString()); 299 } 300 } 301 } 302 return doubled; 303 } 304 305 @Parameterized.Parameters 306 public static Collection coordinatedByZK() { 307 return Arrays.asList(false, true); 308 } 309}