001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;
021import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT_WAIT_INTERVAL;
022import static org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertTrue;
026
027import java.io.IOException;
028import java.io.UncheckedIOException;
029import java.util.List;
030import java.util.Map;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
036import org.apache.hadoop.hbase.StartTestingClusterOption;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.testclassification.MasterTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.JVMClusterUtil;
042import org.junit.ClassRule;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048@Category({ MasterTests.class, MediumTests.class })
049public class TestRetainAssignmentOnRestart extends AbstractTestRestartCluster {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestRetainAssignmentOnRestart.class);
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestRetainAssignmentOnRestart.class);
056
057  private static int NUM_OF_RS = 3;
058
059  public static final class HMasterForTest extends HMaster {
060
061    public HMasterForTest(Configuration conf) throws IOException {
062      super(conf);
063    }
064
065    @Override
066    protected void startProcedureExecutor() throws IOException {
067      // only start procedure executor when we have all the regionservers ready to take regions
068      new Thread(() -> {
069        for (;;) {
070          if (getServerManager().createDestinationServersList().size() == NUM_OF_RS) {
071            try {
072              HMasterForTest.super.startProcedureExecutor();
073            } catch (IOException e) {
074              throw new UncheckedIOException(e);
075            }
076            break;
077          }
078          try {
079            Thread.sleep(1000);
080          } catch (InterruptedException e) {
081          }
082        }
083      }).start();
084    }
085  }
086
087  @Override
088  protected boolean splitWALCoordinatedByZk() {
089    return true;
090  }
091
092  /**
093   * This tests retaining assignments on a cluster restart
094   */
095  @Test
096  public void testRetainAssignmentOnClusterRestart() throws Exception {
097    setupCluster();
098    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
099    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
100    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
101    assertEquals(NUM_OF_RS, threads.size());
102    int[] rsPorts = new int[NUM_OF_RS];
103    for (int i = 0; i < NUM_OF_RS; i++) {
104      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
105    }
106
107    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
108    // use it to load all user region placements
109    SnapshotOfRegionAssignmentFromMeta snapshot =
110      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
111    snapshot.initialize();
112    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
113    for (ServerName serverName : regionToRegionServerMap.values()) {
114      boolean found = false; // Test only, no need to optimize
115      for (int k = 0; k < NUM_OF_RS && !found; k++) {
116        found = serverName.getPort() == rsPorts[k];
117      }
118      assertTrue(found);
119    }
120
121    LOG.info("\n\nShutting down HBase cluster");
122    cluster.stopMaster(0);
123    cluster.shutdown();
124    cluster.waitUntilShutDown();
125
126    LOG.info("\n\nSleeping a bit");
127    Thread.sleep(2000);
128
129    LOG.info("\n\nStarting cluster the second time with the same ports");
130    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
131    master = cluster.startMaster().getMaster();
132    for (int i = 0; i < NUM_OF_RS; i++) {
133      cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, rsPorts[i]);
134      cluster.startRegionServer();
135    }
136
137    ensureServersWithSamePort(master, rsPorts);
138
139    // Wait till master is initialized and all regions are assigned
140    for (TableName TABLE : TABLES) {
141      UTIL.waitTableAvailable(TABLE);
142    }
143    UTIL.waitUntilNoRegionsInTransition(60000);
144
145    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
146    snapshot.initialize();
147    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
148    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
149    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
150      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
151      ServerName currentServer = entry.getValue();
152      LOG.info(
153        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
154      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
155      assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
156    }
157  }
158
159  /**
160   * This tests retaining assignments on a single node restart
161   */
162  @Test
163  public void testRetainAssignmentOnSingleRSRestart() throws Exception {
164    setupCluster();
165    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
166    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
167    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
168    assertEquals(NUM_OF_RS, threads.size());
169    int[] rsPorts = new int[NUM_OF_RS];
170    for (int i = 0; i < NUM_OF_RS; i++) {
171      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
172    }
173
174    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
175    // use it to load all user region placements
176    SnapshotOfRegionAssignmentFromMeta snapshot =
177      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
178    snapshot.initialize();
179    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
180    for (ServerName serverName : regionToRegionServerMap.values()) {
181      boolean found = false; // Test only, no need to optimize
182      for (int k = 0; k < NUM_OF_RS && !found; k++) {
183        found = serverName.getPort() == rsPorts[k];
184      }
185      assertTrue(found);
186    }
187
188    // Server to be restarted
189    ServerName deadRS = threads.get(0).getRegionServer().getServerName();
190    LOG.info("\n\nStopping HMaster and {} server", deadRS);
191    // Stopping master first so that region server SCP will not be initiated
192    cluster.stopMaster(0);
193    cluster.waitForMasterToStop(master.getServerName(), 5000);
194    cluster.stopRegionServer(deadRS);
195    cluster.waitForRegionServerToStop(deadRS, 5000);
196
197    LOG.info("\n\nSleeping a bit");
198    Thread.sleep(2000);
199
200    LOG.info("\n\nStarting HMaster and region server {} second time with the same port", deadRS);
201    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
202    master = cluster.startMaster().getMaster();
203    cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
204    cluster.startRegionServer();
205
206    ensureServersWithSamePort(master, rsPorts);
207
208    // Wait till master is initialized and all regions are assigned
209    for (TableName TABLE : TABLES) {
210      UTIL.waitTableAvailable(TABLE);
211    }
212    UTIL.waitUntilNoRegionsInTransition(60000);
213
214    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
215    snapshot.initialize();
216    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
217    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
218    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
219      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
220      ServerName currentServer = entry.getValue();
221      LOG.info(
222        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
223      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
224
225      if (deadRS.getPort() == oldServer.getPort()) {
226        // Restarted RS start code wont be same
227        assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
228      } else {
229        assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
230      }
231    }
232  }
233
234  /**
235   * This tests the force retaining assignments upon an RS restart, even when master triggers an SCP
236   */
237  @Test
238  public void testForceRetainAssignment() throws Exception {
239    UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true);
240    UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 50);
241    setupCluster();
242    HMaster master = UTIL.getMiniHBaseCluster().getMaster();
243    SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
244    List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
245    assertEquals(NUM_OF_RS, threads.size());
246    int[] rsPorts = new int[NUM_OF_RS];
247    for (int i = 0; i < NUM_OF_RS; i++) {
248      rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
249    }
250
251    // We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
252    // use it to load all user region placements
253    SnapshotOfRegionAssignmentFromMeta snapshot =
254      new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
255    snapshot.initialize();
256    Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
257    for (ServerName serverName : regionToRegionServerMap.values()) {
258      boolean found = false; // Test only, no need to optimize
259      for (int k = 0; k < NUM_OF_RS && !found; k++) {
260        found = serverName.getPort() == rsPorts[k];
261      }
262      LOG.info("Server {} has regions? {}", serverName, found);
263      assertTrue(found);
264    }
265
266    // Server to be restarted
267    ServerName deadRS = threads.get(0).getRegionServer().getServerName();
268    LOG.info("\n\nStopping {} server", deadRS);
269    cluster.stopRegionServer(deadRS);
270
271    LOG.info("\n\nSleeping a bit");
272    Thread.sleep(2000);
273
274    LOG.info("\n\nStarting region server {} second time with the same port", deadRS);
275    cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
276    cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
277    cluster.startRegionServer();
278
279    ensureServersWithSamePort(master, rsPorts);
280
281    // Wait till master is initialized and all regions are assigned
282    for (TableName TABLE : TABLES) {
283      UTIL.waitTableAvailable(TABLE);
284    }
285    UTIL.waitUntilNoRegionsInTransition(60000);
286    snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
287    snapshot.initialize();
288    Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
289    assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
290    for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
291      ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
292      ServerName currentServer = entry.getValue();
293      LOG.info(
294        "Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
295      assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());
296
297      if (deadRS.getPort() == oldServer.getPort()) {
298        // Restarted RS start code wont be same
299        assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
300      } else {
301        assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
302      }
303    }
304  }
305
306  private void setupCluster() throws Exception, IOException, InterruptedException {
307    // Set Zookeeper based connection registry since we will stop master and start a new master
308    // without populating the underlying config for the connection.
309    UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
310      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
311    // Enable retain assignment during ServerCrashProcedure
312    UTIL.getConfiguration().setBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, true);
313    UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
314      .numRegionServers(NUM_OF_RS).build());
315
316    // Turn off balancer
317    UTIL.getMiniHBaseCluster().getMaster().getMasterRpcServices().synchronousBalanceSwitch(false);
318
319    LOG.info("\n\nCreating tables");
320    for (TableName TABLE : TABLES) {
321      UTIL.createTable(TABLE, FAMILY);
322    }
323    for (TableName TABLE : TABLES) {
324      UTIL.waitTableEnabled(TABLE);
325    }
326
327    UTIL.getMiniHBaseCluster().getMaster();
328    UTIL.waitUntilNoRegionsInTransition(60000);
329  }
330
331  private void ensureServersWithSamePort(HMaster master, int[] rsPorts) {
332    // Make sure live regionservers are on the same host/port
333    List<ServerName> localServers = master.getServerManager().getOnlineServersList();
334    assertEquals(NUM_OF_RS, localServers.size());
335    for (int i = 0; i < NUM_OF_RS; i++) {
336      boolean found = false;
337      for (ServerName serverName : localServers) {
338        if (serverName.getPort() == rsPorts[i]) {
339          found = true;
340          break;
341        }
342      }
343      assertTrue(found);
344    }
345  }
346}