001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos.actions;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Properties;
027import java.util.Random;
028import java.util.Set;
029import java.util.concurrent.ThreadLocalRandom;
030import java.util.function.BiConsumer;
031import java.util.function.Consumer;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.ClusterMetrics;
034import org.apache.hadoop.hbase.HBaseClusterInterface;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.IntegrationTestBase;
037import org.apache.hadoop.hbase.IntegrationTestingUtility;
038import org.apache.hadoop.hbase.ServerMetrics;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.chaos.factories.MonkeyConstants;
043import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
044import org.apache.hadoop.hbase.client.Admin;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.slf4j.Logger;
052
053/**
054 * A (possibly mischievous) action that the ChaosMonkey can perform.
055 */
056public abstract class Action {
057
058  public static final String KILL_MASTER_TIMEOUT_KEY = "hbase.chaosmonkey.action.killmastertimeout";
059  public static final String START_MASTER_TIMEOUT_KEY =
060    "hbase.chaosmonkey.action.startmastertimeout";
061  public static final String KILL_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.killrstimeout";
062  public static final String START_RS_TIMEOUT_KEY = "hbase.chaosmonkey.action.startrstimeout";
063  public static final String KILL_ZK_NODE_TIMEOUT_KEY =
064    "hbase.chaosmonkey.action.killzknodetimeout";
065  public static final String START_ZK_NODE_TIMEOUT_KEY =
066    "hbase.chaosmonkey.action.startzknodetimeout";
067  public static final String KILL_DATANODE_TIMEOUT_KEY =
068    "hbase.chaosmonkey.action.killdatanodetimeout";
069  public static final String START_DATANODE_TIMEOUT_KEY =
070    "hbase.chaosmonkey.action.startdatanodetimeout";
071  public static final String KILL_NAMENODE_TIMEOUT_KEY =
072    "hbase.chaosmonkey.action.killnamenodetimeout";
073  public static final String START_NAMENODE_TIMEOUT_KEY =
074    "hbase.chaosmonkey.action.startnamenodetimeout";
075
076  protected static final long KILL_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
077  protected static final long START_MASTER_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
078  protected static final long KILL_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
079  protected static final long START_RS_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
080  protected static final long KILL_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
081  protected static final long START_ZK_NODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
082  protected static final long KILL_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
083  protected static final long START_DATANODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
084  protected static final long KILL_NAMENODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
085  protected static final long START_NAMENODE_TIMEOUT_DEFAULT = PolicyBasedChaosMonkey.TIMEOUT;
086
087  protected ActionContext context;
088  protected HBaseClusterInterface cluster;
089  protected ClusterMetrics initialStatus;
090  protected ServerName[] initialServers;
091  protected Properties monkeyProps;
092
093  protected long killMasterTimeout;
094  protected long startMasterTimeout;
095  protected long killRsTimeout;
096  protected long startRsTimeout;
097  protected long killZkNodeTimeout;
098  protected long startZkNodeTimeout;
099  protected long killDataNodeTimeout;
100  protected long startDataNodeTimeout;
101  protected long killNameNodeTimeout;
102  protected long startNameNodeTimeout;
103  protected boolean skipMetaRS;
104
105  /**
106   * Retrieve the instance's {@link Logger}, for use throughout the class hierarchy.
107   */
108  protected abstract Logger getLogger();
109
110  public void init(ActionContext context) throws IOException {
111    this.context = context;
112    cluster = context.getHBaseCluster();
113    initialStatus = cluster.getInitialClusterMetrics();
114    Collection<ServerName> regionServers = initialStatus.getLiveServerMetrics().keySet();
115    initialServers = regionServers.toArray(new ServerName[0]);
116
117    monkeyProps = context.getMonkeyProps();
118    if (monkeyProps == null) {
119      monkeyProps = new Properties();
120      IntegrationTestBase.loadMonkeyProperties(monkeyProps, cluster.getConf());
121    }
122
123    killMasterTimeout = Long.parseLong(
124      monkeyProps.getProperty(KILL_MASTER_TIMEOUT_KEY, KILL_MASTER_TIMEOUT_DEFAULT + ""));
125    startMasterTimeout = Long.parseLong(
126      monkeyProps.getProperty(START_MASTER_TIMEOUT_KEY, START_MASTER_TIMEOUT_DEFAULT + ""));
127    killRsTimeout =
128      Long.parseLong(monkeyProps.getProperty(KILL_RS_TIMEOUT_KEY, KILL_RS_TIMEOUT_DEFAULT + ""));
129    startRsTimeout =
130      Long.parseLong(monkeyProps.getProperty(START_RS_TIMEOUT_KEY, START_RS_TIMEOUT_DEFAULT + ""));
131    killZkNodeTimeout = Long.parseLong(
132      monkeyProps.getProperty(KILL_ZK_NODE_TIMEOUT_KEY, KILL_ZK_NODE_TIMEOUT_DEFAULT + ""));
133    startZkNodeTimeout = Long.parseLong(
134      monkeyProps.getProperty(START_ZK_NODE_TIMEOUT_KEY, START_ZK_NODE_TIMEOUT_DEFAULT + ""));
135    killDataNodeTimeout = Long.parseLong(
136      monkeyProps.getProperty(KILL_DATANODE_TIMEOUT_KEY, KILL_DATANODE_TIMEOUT_DEFAULT + ""));
137    startDataNodeTimeout = Long.parseLong(
138      monkeyProps.getProperty(START_DATANODE_TIMEOUT_KEY, START_DATANODE_TIMEOUT_DEFAULT + ""));
139    killNameNodeTimeout = Long.parseLong(
140      monkeyProps.getProperty(KILL_NAMENODE_TIMEOUT_KEY, KILL_NAMENODE_TIMEOUT_DEFAULT + ""));
141    startNameNodeTimeout = Long.parseLong(
142      monkeyProps.getProperty(START_NAMENODE_TIMEOUT_KEY, START_NAMENODE_TIMEOUT_DEFAULT + ""));
143    skipMetaRS = Boolean.parseBoolean(monkeyProps.getProperty(MonkeyConstants.SKIP_META_RS,
144      MonkeyConstants.DEFAULT_SKIP_META_RS + ""));
145  }
146
147  public void perform() throws Exception {
148  }
149
150  /** Returns current region servers - active master */
151  protected ServerName[] getCurrentServers() throws IOException {
152    ClusterMetrics clusterStatus = cluster.getClusterMetrics();
153    Collection<ServerName> regionServers = clusterStatus.getLiveServerMetrics().keySet();
154    int count = regionServers.size();
155    if (count <= 0) {
156      return new ServerName[] {};
157    }
158    ServerName master = clusterStatus.getMasterName();
159    Set<ServerName> masters = new HashSet<>();
160    masters.add(master);
161    masters.addAll(clusterStatus.getBackupMasterNames());
162    ArrayList<ServerName> tmp = new ArrayList<>(count);
163    tmp.addAll(regionServers);
164    tmp.removeAll(masters);
165
166    if (skipMetaRS) {
167      ServerName metaServer = cluster.getServerHoldingMeta();
168      tmp.remove(metaServer);
169    }
170
171    return tmp.toArray(new ServerName[0]);
172  }
173
174  protected void killMaster(ServerName server) throws IOException {
175    getLogger().info("Killing master {}", server);
176    cluster.killMaster(server);
177    cluster.waitForMasterToStop(server, killMasterTimeout);
178    getLogger().info("Killed master " + server);
179  }
180
181  protected void startMaster(ServerName server) throws IOException {
182    getLogger().info("Starting master {}", server.getHostname());
183    cluster.startMaster(server.getHostname(), server.getPort());
184    cluster.waitForActiveAndReadyMaster(startMasterTimeout);
185    getLogger().info("Started master " + server.getHostname());
186  }
187
188  protected void stopRs(ServerName server) throws IOException {
189    getLogger().info("Stopping regionserver {}", server);
190    cluster.stopRegionServer(server);
191    cluster.waitForRegionServerToStop(server, killRsTimeout);
192    getLogger().info("Stopping regionserver {}. Reported num of rs:{}", server,
193      cluster.getClusterMetrics().getLiveServerMetrics().size());
194  }
195
196  protected void suspendRs(ServerName server) throws IOException {
197    getLogger().info("Suspending regionserver {}", server);
198    cluster.suspendRegionServer(server);
199    if (!(cluster instanceof SingleProcessHBaseCluster)) {
200      cluster.waitForRegionServerToStop(server, killRsTimeout);
201    }
202    getLogger().info("Suspending regionserver {}. Reported num of rs:{}", server,
203      cluster.getClusterMetrics().getLiveServerMetrics().size());
204  }
205
206  protected void resumeRs(ServerName server) throws IOException {
207    getLogger().info("Resuming regionserver {}", server);
208    cluster.resumeRegionServer(server);
209    if (!(cluster instanceof SingleProcessHBaseCluster)) {
210      cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout);
211    }
212    getLogger().info("Resuming regionserver {}. Reported num of rs:{}", server,
213      cluster.getClusterMetrics().getLiveServerMetrics().size());
214  }
215
216  protected void killRs(ServerName server) throws IOException {
217    getLogger().info("Killing regionserver {}", server);
218    cluster.killRegionServer(server);
219    cluster.waitForRegionServerToStop(server, killRsTimeout);
220    getLogger().info("Killed regionserver {}. Reported num of rs:{}", server,
221      cluster.getClusterMetrics().getLiveServerMetrics().size());
222  }
223
224  protected void startRs(ServerName server) throws IOException {
225    getLogger().info("Starting regionserver {}", server.getAddress());
226    cluster.startRegionServer(server.getHostname(), server.getPort());
227    cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), startRsTimeout);
228    getLogger().info("Started regionserver {}. Reported num of rs:{}", server.getAddress(),
229      cluster.getClusterMetrics().getLiveServerMetrics().size());
230  }
231
232  protected void killZKNode(ServerName server) throws IOException {
233    getLogger().info("Killing zookeeper node {}", server);
234    cluster.killZkNode(server);
235    cluster.waitForZkNodeToStop(server, killZkNodeTimeout);
236    getLogger().info("Killed zookeeper node {}. Reported num of rs:{}", server,
237      cluster.getClusterMetrics().getLiveServerMetrics().size());
238  }
239
240  protected void startZKNode(ServerName server) throws IOException {
241    getLogger().info("Starting zookeeper node {}", server.getHostname());
242    cluster.startZkNode(server.getHostname(), server.getPort());
243    cluster.waitForZkNodeToStart(server, startZkNodeTimeout);
244    getLogger().info("Started zookeeper node {}", server);
245  }
246
247  protected void killDataNode(ServerName server) throws IOException {
248    getLogger().info("Killing datanode {}", server);
249    cluster.killDataNode(server);
250    cluster.waitForDataNodeToStop(server, killDataNodeTimeout);
251    getLogger().info("Killed datanode {}. Reported num of rs:{}", server,
252      cluster.getClusterMetrics().getLiveServerMetrics().size());
253  }
254
255  protected void startDataNode(ServerName server) throws IOException {
256    getLogger().info("Starting datanode {}", server.getHostname());
257    cluster.startDataNode(server);
258    cluster.waitForDataNodeToStart(server, startDataNodeTimeout);
259    getLogger().info("Started datanode {}", server);
260  }
261
262  protected void killNameNode(ServerName server) throws IOException {
263    getLogger().info("Killing namenode {}", server.getHostname());
264    cluster.killNameNode(server);
265    cluster.waitForNameNodeToStop(server, killNameNodeTimeout);
266    getLogger().info("Killed namenode {}. Reported num of rs:{}", server,
267      cluster.getClusterMetrics().getLiveServerMetrics().size());
268  }
269
270  protected void startNameNode(ServerName server) throws IOException {
271    getLogger().info("Starting namenode {}", server.getHostname());
272    cluster.startNameNode(server);
273    cluster.waitForNameNodeToStart(server, startNameNodeTimeout);
274    getLogger().info("Started namenode {}", server);
275  }
276
277  protected void killJournalNode(ServerName server) throws IOException {
278    getLogger().info("Killing journalnode {}", server.getHostname());
279    cluster.killJournalNode(server);
280    cluster.waitForJournalNodeToStop(server, killNameNodeTimeout);
281    getLogger().info("Killed journalnode {}", server);
282  }
283
284  protected void startJournalNode(ServerName server) throws IOException {
285    getLogger().info("Starting journalnode {}", server.getHostname());
286    cluster.startJournalNode(server);
287    cluster.waitForJournalNodeToStart(server, startNameNodeTimeout);
288    getLogger().info("Started journalnode {}", server);
289  }
290
291  protected void unbalanceRegions(ClusterMetrics clusterStatus, List<ServerName> fromServers,
292    List<ServerName> toServers, double fractionOfRegions) throws Exception {
293    List<byte[]> victimRegions = new ArrayList<>();
294    for (Map.Entry<ServerName, ServerMetrics> entry : clusterStatus.getLiveServerMetrics()
295      .entrySet()) {
296      ServerName sn = entry.getKey();
297      ServerMetrics serverLoad = entry.getValue();
298      // Ugh.
299      List<byte[]> regions = new ArrayList<>(serverLoad.getRegionMetrics().keySet());
300      int victimRegionCount = (int) Math.ceil(fractionOfRegions * regions.size());
301      getLogger().debug("Removing {} regions from {}", victimRegionCount, sn);
302      Random rand = ThreadLocalRandom.current();
303      for (int i = 0; i < victimRegionCount; ++i) {
304        int victimIx = rand.nextInt(regions.size());
305        String regionId = RegionInfo.encodeRegionName(regions.remove(victimIx));
306        victimRegions.add(Bytes.toBytes(regionId));
307      }
308    }
309
310    getLogger().info("Moving {} regions from {} servers to {} different servers",
311      victimRegions.size(), fromServers.size(), toServers.size());
312    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
313    Random rand = ThreadLocalRandom.current();
314    for (byte[] victimRegion : victimRegions) {
315      // Don't keep moving regions if we're
316      // trying to stop the monkey.
317      if (context.isStopping()) {
318        break;
319      }
320      int targetIx = rand.nextInt(toServers.size());
321      admin.move(victimRegion, toServers.get(targetIx));
322    }
323  }
324
325  protected void forceBalancer() throws Exception {
326    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
327    boolean result = false;
328    try {
329      result = admin.balance();
330    } catch (Exception e) {
331      getLogger().warn("Got exception while doing balance ", e);
332    }
333    if (!result) {
334      getLogger().error("Balancer didn't succeed");
335    }
336  }
337
338  protected void setBalancer(boolean onOrOff, boolean synchronous) throws Exception {
339    Admin admin = this.context.getHBaseIntegrationTestingUtility().getAdmin();
340    try {
341      admin.balancerSwitch(onOrOff, synchronous);
342    } catch (Exception e) {
343      getLogger().warn("Got exception while switching balance ", e);
344    }
345  }
346
347  public Configuration getConf() {
348    return cluster.getConf();
349  }
350
351  /**
352   * Apply a transform to all columns in a given table. If there are no columns in a table or if the
353   * context is stopping does nothing.
354   * @param tableName the table to modify
355   * @param transform the modification to perform. Callers will have the column name as a string and
356   *                  a column family builder available to them
357   */
358  protected void modifyAllTableColumns(TableName tableName,
359    BiConsumer<String, ColumnFamilyDescriptorBuilder> transform) throws IOException {
360    HBaseTestingUtil util = this.context.getHBaseIntegrationTestingUtility();
361    Admin admin = util.getAdmin();
362
363    TableDescriptor tableDescriptor = admin.getDescriptor(tableName);
364    ColumnFamilyDescriptor[] columnDescriptors = tableDescriptor.getColumnFamilies();
365
366    if (columnDescriptors == null || columnDescriptors.length == 0) {
367      return;
368    }
369
370    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);
371    for (ColumnFamilyDescriptor descriptor : columnDescriptors) {
372      ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(descriptor);
373      transform.accept(descriptor.getNameAsString(), cfd);
374      builder.modifyColumnFamily(cfd.build());
375    }
376
377    // Don't try the modify if we're stopping
378    if (this.context.isStopping()) {
379      return;
380    }
381    admin.modifyTable(builder.build());
382  }
383
384  /**
385   * Apply a transform to all columns in a given table. If there are no columns in a table or if the
386   * context is stopping does nothing.
387   * @param tableName the table to modify
388   * @param transform the modification to perform on each column family descriptor builder
389   */
390  protected void modifyAllTableColumns(TableName tableName,
391    Consumer<ColumnFamilyDescriptorBuilder> transform) throws IOException {
392    modifyAllTableColumns(tableName, (name, cfd) -> transform.accept(cfd));
393  }
394
395  /**
396   * Context for Action's
397   */
398  public static class ActionContext {
399    private IntegrationTestingUtility util;
400    private Properties monkeyProps = null;
401
402    public ActionContext(IntegrationTestingUtility util) {
403      this.util = util;
404    }
405
406    public ActionContext(Properties monkeyProps, IntegrationTestingUtility util) {
407      this.util = util;
408      this.monkeyProps = monkeyProps;
409    }
410
411    public Properties getMonkeyProps() {
412      return monkeyProps;
413    }
414
415    public IntegrationTestingUtility getHBaseIntegrationTestingUtility() {
416      return util;
417    }
418
419    public HBaseClusterInterface getHBaseCluster() {
420      return util.getHBaseClusterInterface();
421    }
422
423    public boolean isStopping() {
424      return false;
425    }
426  }
427}