001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Arrays;
024import java.util.Collection;
025import java.util.List;
026import java.util.NavigableSet;
027import java.util.Set;
028import java.util.TreeSet;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.client.RegionLocator;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
046import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052import org.junit.runner.RunWith;
053import org.junit.runners.Parameterized;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058
059/**
060 * Tests the restarting of everything as done during rolling restarts.
061 */
062@RunWith(Parameterized.class)
063@Category({ MasterTests.class, LargeTests.class })
064public class TestRollingRestart {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestRollingRestart.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestRollingRestart.class);
071
072  private static HBaseTestingUtil TEST_UTIL;
073  @Rule
074  public TestName name = new TestName();
075
076  @Parameterized.Parameter
077  public boolean splitWALCoordinatedByZK;
078
079  @Test
080  public void testBasicRollingRestart() throws Exception {
081
082    // Start a cluster with 2 masters and 4 regionservers
083    final int NUM_MASTERS = 2;
084    final int NUM_RS = 3;
085    final int NUM_REGIONS_TO_CREATE = 20;
086
087    int expectedNumRS = 3;
088
089    // Start the cluster
090    log("Starting cluster");
091    Configuration conf = HBaseConfiguration.create();
092    conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK);
093    TEST_UTIL = new HBaseTestingUtil(conf);
094    StartTestingClusterOption option = StartTestingClusterOption.builder().numMasters(NUM_MASTERS)
095      .numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
096    TEST_UTIL.startMiniCluster(option);
097    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
098    log("Waiting for active/ready master");
099    cluster.waitForActiveAndReadyMaster();
100
101    // Create a table with regions
102    final TableName tableName =
103      TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
104    byte[] family = Bytes.toBytes("family");
105    log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
106    Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
107    int numRegions = -1;
108    try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
109      numRegions = r.getStartKeys().length;
110    }
111    numRegions += 1; // catalogs
112    log("Waiting for no more RIT\n");
113    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
114    log("Disabling table\n");
115    TEST_UTIL.getAdmin().disableTable(tableName);
116    log("Waiting for no more RIT\n");
117    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
118    NavigableSet<String> regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
119    log("Verifying only catalog region is assigned\n");
120    if (regions.size() != 1) {
121      for (String oregion : regions) {
122        log("Region still online: " + oregion);
123      }
124    }
125    assertEquals(1, regions.size());
126    log("Enabling table\n");
127    TEST_UTIL.getAdmin().enableTable(tableName);
128    log("Waiting for no more RIT\n");
129    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
130    log("Verifying there are " + numRegions + " assigned on cluster\n");
131    regions = HBaseTestingUtil.getAllOnlineRegions(cluster);
132    assertRegionsAssigned(cluster, regions);
133    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
134
135    // Add a new regionserver
136    log("Adding a fourth RS");
137    RegionServerThread restarted = cluster.startRegionServer();
138    expectedNumRS++;
139    restarted.waitForServerOnline();
140    log("Additional RS is online");
141    log("Waiting for no more RIT");
142    TEST_UTIL.waitUntilNoRegionsInTransition(60000);
143    log("Verifying there are " + numRegions + " assigned on cluster");
144    assertRegionsAssigned(cluster, regions);
145    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
146
147    // Master Restarts
148    List<MasterThread> masterThreads = cluster.getMasterThreads();
149    MasterThread activeMaster = null;
150    MasterThread backupMaster = null;
151    assertEquals(2, masterThreads.size());
152    if (masterThreads.get(0).getMaster().isActiveMaster()) {
153      activeMaster = masterThreads.get(0);
154      backupMaster = masterThreads.get(1);
155    } else {
156      activeMaster = masterThreads.get(1);
157      backupMaster = masterThreads.get(0);
158    }
159
160    // Bring down the backup master
161    log("Stopping backup master\n\n");
162    backupMaster.getMaster().stop("Stop of backup during rolling restart");
163    cluster.hbaseCluster.waitOnMaster(backupMaster);
164
165    // Bring down the primary master
166    log("Stopping primary master\n\n");
167    activeMaster.getMaster().stop("Stop of active during rolling restart");
168    cluster.hbaseCluster.waitOnMaster(activeMaster);
169
170    // Start primary master
171    log("Restarting primary master\n\n");
172    activeMaster = cluster.startMaster();
173    cluster.waitForActiveAndReadyMaster();
174
175    // Start backup master
176    log("Restarting backup master\n\n");
177    backupMaster = cluster.startMaster();
178
179    assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
180
181    // RegionServer Restarts
182
183    // Bring them down, one at a time, waiting between each to complete
184    List<RegionServerThread> regionServers = cluster.getLiveRegionServerThreads();
185    int num = 1;
186    int total = regionServers.size();
187    for (RegionServerThread rst : regionServers) {
188      ServerName serverName = rst.getRegionServer().getServerName();
189      log("Stopping region server " + num + " of " + total + " [ " + serverName + "]");
190      rst.getRegionServer().stop("Stopping RS during rolling restart");
191      cluster.hbaseCluster.waitOnRegionServer(rst);
192      log("Waiting for RS shutdown to be handled by master");
193      waitForRSShutdownToStartAndFinish(activeMaster, serverName);
194      log("RS shutdown done, waiting for no more RIT");
195      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
196      log("Verifying there are " + numRegions + " assigned on cluster");
197      assertRegionsAssigned(cluster, regions);
198      expectedNumRS--;
199      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
200      log("Restarting region server " + num + " of " + total);
201      restarted = cluster.startRegionServer();
202      restarted.waitForServerOnline();
203      expectedNumRS++;
204      log("Region server " + num + " is back online");
205      log("Waiting for no more RIT");
206      TEST_UTIL.waitUntilNoRegionsInTransition(60000);
207      log("Verifying there are " + numRegions + " assigned on cluster");
208      assertRegionsAssigned(cluster, regions);
209      assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
210      num++;
211    }
212    Thread.sleep(1000);
213    assertRegionsAssigned(cluster, regions);
214
215    // TODO: Bring random 3 of 4 RS down at the same time
216
217    ht.close();
218    // Stop the cluster
219    TEST_UTIL.shutdownMiniCluster();
220  }
221
222  /**
223   * Checks if the SCP of specific dead server has been executed.
224   * @return true if the SCP of specific serverName has been executed, false if not
225   */
226  private boolean isDeadServerSCPExecuted(ServerName serverName) throws IOException {
227    return TEST_UTIL.getMiniHBaseCluster().getMaster().getProcedures().stream()
228      .anyMatch(p -> p instanceof ServerCrashProcedure
229        && ((ServerCrashProcedure) p).getServerName().equals(serverName));
230  }
231
232  private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster, ServerName serverName)
233    throws InterruptedException, IOException {
234    ServerManager sm = activeMaster.getMaster().getServerManager();
235    // First wait for it to be in dead list
236    while (!sm.getDeadServers().isDeadServer(serverName)) {
237      log("Waiting for [" + serverName + "] to be listed as dead in master");
238      Thread.sleep(1);
239    }
240    log(
241      "Server [" + serverName + "] marked as dead, waiting for it to " + "finish dead processing");
242
243    TEST_UTIL.waitFor(60000, () -> isDeadServerSCPExecuted(serverName));
244
245    while (sm.areDeadServersInProgress()) {
246      log("Server [" + serverName + "] still being processed, waiting");
247      Thread.sleep(100);
248    }
249    log("Server [" + serverName + "] done with server shutdown processing");
250  }
251
252  private void log(String msg) {
253    LOG.debug("\n\nTRR: " + msg + "\n");
254  }
255
256  private int getNumberOfOnlineRegions(SingleProcessHBaseCluster cluster) {
257    int numFound = 0;
258    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
259      numFound += rst.getRegionServer().getNumberOfOnlineRegions();
260    }
261    return numFound;
262  }
263
264  private void assertRegionsAssigned(SingleProcessHBaseCluster cluster, Set<String> expectedRegions)
265    throws IOException {
266    int numFound = getNumberOfOnlineRegions(cluster);
267    if (expectedRegions.size() > numFound) {
268      log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound);
269      NavigableSet<String> foundRegions = HBaseTestingUtil.getAllOnlineRegions(cluster);
270      for (String region : expectedRegions) {
271        if (!foundRegions.contains(region)) {
272          log("Missing region: " + region);
273        }
274      }
275      assertEquals(expectedRegions.size(), numFound);
276    } else if (expectedRegions.size() < numFound) {
277      int doubled = numFound - expectedRegions.size();
278      log("Expected to find " + expectedRegions.size() + " but found" + " " + numFound + " ("
279        + doubled + " double assignments?)");
280      NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
281      for (String region : doubleRegions) {
282        log("Region is double assigned: " + region);
283      }
284      assertEquals(expectedRegions.size(), numFound);
285    } else {
286      log("Success!  Found expected number of " + numFound + " regions");
287    }
288  }
289
290  private NavigableSet<String> getDoubleAssignedRegions(SingleProcessHBaseCluster cluster)
291    throws IOException {
292    NavigableSet<String> online = new TreeSet<>();
293    NavigableSet<String> doubled = new TreeSet<>();
294    for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
295      for (RegionInfo region : ProtobufUtil
296        .getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
297        if (!online.add(region.getRegionNameAsString())) {
298          doubled.add(region.getRegionNameAsString());
299        }
300      }
301    }
302    return doubled;
303  }
304
305  @Parameterized.Parameters
306  public static Collection coordinatedByZK() {
307    return Arrays.asList(false, true);
308  }
309}