001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos.monkies;
019
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Objects;
025import java.util.Properties;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.IntegrationTestingUtility;
031import org.apache.hadoop.hbase.chaos.policies.Policy;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.hadoop.hbase.util.ReservoirSample;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
038
039/**
040 * Chaos monkey that given multiple policies will run actions against the cluster.
041 */
042public class PolicyBasedChaosMonkey extends ChaosMonkey {
043  private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class);
044
045  private static final long ONE_SEC = 1000;
046  private static final long ONE_MIN = 60 * ONE_SEC;
047
048  public static final long TIMEOUT = ONE_MIN;
049
050  final IntegrationTestingUtility util;
051  final Properties monkeyProps;
052
053  private final Policy[] policies;
054  private final ExecutorService monkeyThreadPool;
055
056  /**
057   * Construct a new ChaosMonkey
058   * @param util     the HBaseIntegrationTestingUtility already configured
059   * @param policies custom policies to use
060   */
061  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
062    this(null, util, policies);
063  }
064
065  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
066    this(null, util, policies);
067  }
068
069  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
070    Collection<Policy> policies) {
071    this(monkeyProps, util, policies.toArray(new Policy[0]));
072  }
073
074  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
075    Policy... policies) {
076    this.monkeyProps = monkeyProps;
077    this.util = Objects.requireNonNull(util);
078    this.policies = Objects.requireNonNull(policies);
079    if (policies.length == 0) {
080      throw new IllegalArgumentException("policies may not be empty");
081    }
082    this.monkeyThreadPool = buildMonkeyThreadPool(policies.length);
083  }
084
085  private static ExecutorService buildMonkeyThreadPool(final int size) {
086    return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setDaemon(false)
087      .setNameFormat("ChaosMonkey-%d").setUncaughtExceptionHandler((t, e) -> {
088        LOG.error("Uncaught exception in thread {}", t.getName(), e);
089        throw new RuntimeException(e);
090      }).build());
091  }
092
093  /** Selects a random item from the given items */
094  public static <T> T selectRandomItem(T[] items) {
095    return items[ThreadLocalRandom.current().nextInt(items.length)];
096  }
097
098  /** Selects a random item from the given items with weights */
099  public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
100    int totalWeight = 0;
101    for (Pair<T, Integer> pair : items) {
102      totalWeight += pair.getSecond();
103    }
104
105    int cutoff = ThreadLocalRandom.current().nextInt(totalWeight);
106    int cummulative = 0;
107    T item = null;
108
109    // warn: O(n)
110    for (int i = 0; i < items.size(); i++) {
111      int curWeight = items.get(i).getSecond();
112      if (cutoff < cummulative + curWeight) {
113        item = items.get(i).getFirst();
114        break;
115      }
116      cummulative += curWeight;
117    }
118
119    return item;
120  }
121
122  /** Selects and returns ceil(ratio * items.length) random items from the given array */
123  public static <T> List<T> selectRandomItems(T[] items, float ratio) {
124    // clamp ratio to [0.0,1.0]
125    ratio = Math.max(Math.min(ratio, 1.0f), 0.0f);
126    final int selectedNumber = (int) Math.ceil(items.length * ratio);
127    final ReservoirSample<T> sample = new ReservoirSample<>(selectedNumber);
128    sample.add(Arrays.stream(items));
129    final List<T> shuffledItems = sample.getSamplingResult();
130    Collections.shuffle(shuffledItems);
131    return shuffledItems;
132  }
133
134  @Override
135  public void start() throws Exception {
136    final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util);
137    for (final Policy policy : policies) {
138      policy.init(context);
139      monkeyThreadPool.execute(policy);
140    }
141  }
142
143  @Override
144  public void stop(String why) {
145    // stop accepting new work (shouldn't be any with a fixed-size pool)
146    monkeyThreadPool.shutdown();
147    // notify all executing policies that it's time to halt.
148    for (Policy policy : policies) {
149      policy.stop(why);
150    }
151  }
152
153  @Override
154  public boolean isStopped() {
155    return monkeyThreadPool.isTerminated();
156  }
157
158  @Override
159  public void waitForStop() throws InterruptedException {
160    if (!monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
161      LOG.warn("Some pool threads failed to terminate. Forcing. {}", monkeyThreadPool);
162      monkeyThreadPool.shutdownNow();
163    }
164  }
165
166  @Override
167  public boolean isDestructive() {
168    // TODO: we can look at the actions, and decide to do the restore cluster or not based on them.
169    return true;
170  }
171}