001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos.actions;
019
020import java.io.IOException;
021import java.util.ArrayDeque;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Queue;
025import java.util.Random;
026import java.util.Set;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.stream.Collectors;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
031import org.apache.hadoop.hbase.net.Address;
032import org.apache.hadoop.hbase.util.Threads;
033import org.apache.hadoop.util.Shell;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Suspend then resume a ratio of the regionservers in a rolling fashion. At each step, either
039 * suspend a server, or resume one, sleeping (sleepTime) in between steps. The parameter
040 * maxSuspendedServers limits the maximum number of servers that can be down at the same time during
041 * rolling restarts.
042 */
043public class RollingBatchSuspendResumeRsAction extends Action {
044  private static final Logger LOG =
045    LoggerFactory.getLogger(RollingBatchSuspendResumeRsAction.class);
046  private final float ratio;
047  private final long sleepTime;
048  private final int maxSuspendedServers; // number of maximum suspended servers at any given time.
049
050  public RollingBatchSuspendResumeRsAction(long sleepTime, float ratio) {
051    this(sleepTime, ratio, 5);
052  }
053
054  public RollingBatchSuspendResumeRsAction(long sleepTime, float ratio, int maxSuspendedServers) {
055    this.ratio = ratio;
056    this.sleepTime = sleepTime;
057    this.maxSuspendedServers = maxSuspendedServers;
058  }
059
060  enum SuspendOrResume {
061    SUSPEND,
062    RESUME
063  }
064
065  @Override
066  protected Logger getLogger() {
067    return LOG;
068  }
069
070  private void confirmResumed(Set<ServerName> resumedServers) {
071    if (resumedServers.isEmpty()) {
072      return;
073    }
074    try {
075      Set<Address> addrs =
076        resumedServers.stream().map(ServerName::getAddress).collect(Collectors.toSet());
077      cluster.getClusterMetrics().getLiveServerMetrics().keySet().stream()
078        .map(ServerName::getAddress).forEach(addrs::remove);
079      for (Address addr : addrs) {
080        LOG.warn("Region server {} is crashed after resuming, starting", addr);
081        startRs(ServerName.valueOf(addr, -1));
082      }
083    } catch (IOException e) {
084      LOG.warn("Failed to check liveness for region servers {}", resumedServers);
085    }
086  }
087
088  @Override
089  public void perform() throws Exception {
090    getLogger().info("Performing action: Rolling batch suspending {}% of region servers",
091      (int) (ratio * 100));
092    List<ServerName> selectedServers = selectServers();
093    Queue<ServerName> serversToBeSuspended = new ArrayDeque<>(selectedServers);
094    Queue<ServerName> suspendedServers = new ArrayDeque<>();
095    // After resuming, usually the region server will crash soon because of session expired, and if
096    // the region server is not started by 'autostart', it will crash for ever. So here we record
097    // these region servers and make sure that they are all alive before exiting this action. See
098    // HBASE-29206 for more details.
099    Set<ServerName> resumedServers = new HashSet<>();
100    Random rand = ThreadLocalRandom.current();
101    // loop while there are servers to be suspended or suspended servers to be resumed
102    while (
103      (!serversToBeSuspended.isEmpty() || !suspendedServers.isEmpty()) && !context.isStopping()
104    ) {
105      final SuspendOrResume action;
106      if (serversToBeSuspended.isEmpty()) { // no more servers to suspend
107        action = SuspendOrResume.RESUME;
108      } else if (suspendedServers.isEmpty()) {
109        action = SuspendOrResume.SUSPEND; // no more servers to resume
110      } else if (suspendedServers.size() >= maxSuspendedServers) {
111        // we have too many suspended servers. Don't suspend any more
112        action = SuspendOrResume.RESUME;
113      } else {
114        // do a coin toss
115        action = rand.nextBoolean() ? SuspendOrResume.SUSPEND : SuspendOrResume.RESUME;
116      }
117      ServerName server;
118      switch (action) {
119        case SUSPEND:
120          server = serversToBeSuspended.remove();
121          try {
122            suspendRs(server);
123          } catch (Shell.ExitCodeException e) {
124            LOG.warn("Problem suspending but presume successful; code={}", e.getExitCode(), e);
125          }
126          suspendedServers.add(server);
127          break;
128        case RESUME:
129          server = suspendedServers.remove();
130          try {
131            resumeRs(server);
132          } catch (Shell.ExitCodeException e) {
133            LOG.info("Problem resuming, will retry; code={}", e.getExitCode(), e);
134          }
135          resumedServers.add(server);
136          break;
137      }
138
139      getLogger().info("Sleeping for:{}", sleepTime);
140      Threads.sleep(sleepTime);
141      confirmResumed(resumedServers);
142    }
143  }
144
145  protected List<ServerName> selectServers() throws IOException {
146    return PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), ratio);
147  }
148
149}