001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.chaos.monkies;
019
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Objects;
025import java.util.Properties;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.ThreadLocalRandom;
029import java.util.concurrent.TimeUnit;
030import org.apache.hadoop.hbase.IntegrationTestingUtility;
031import org.apache.hadoop.hbase.chaos.policies.Policy;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.hadoop.hbase.util.ReservoirSample;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
038
039/**
040 * Chaos monkey that given multiple policies will run actions against the cluster.
041 */
042public class PolicyBasedChaosMonkey extends ChaosMonkey {
043  private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class);
044
045  private static final long ONE_SEC = 1000;
046  private static final long ONE_MIN = 60 * ONE_SEC;
047
048  public static final long TIMEOUT = ONE_MIN;
049
050  final IntegrationTestingUtility util;
051  final Properties monkeyProps;
052
053  private final Policy[] policies;
054  private final ExecutorService monkeyThreadPool;
055
056  /**
057   * Construct a new ChaosMonkey
058   * @param util     the HBaseIntegrationTestingUtility already configured
059   * @param policies custom policies to use
060   */
061  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
062    this(null, util, policies);
063  }
064
065  public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) {
066    this(null, util, policies);
067  }
068
069  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
070    Collection<Policy> policies) {
071    this(monkeyProps, util, policies.toArray(new Policy[0]));
072  }
073
074  public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util,
075    Policy... policies) {
076    this.monkeyProps = monkeyProps;
077    this.util = Objects.requireNonNull(util);
078    this.policies = Objects.requireNonNull(policies);
079    if (policies.length == 0) {
080      throw new IllegalArgumentException("policies may not be empty");
081    }
082    this.monkeyThreadPool = buildMonkeyThreadPool(policies.length);
083  }
084
085  private static ExecutorService buildMonkeyThreadPool(final int size) {
086    return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setDaemon(false)
087      .setNameFormat("ChaosMonkey-%d").setUncaughtExceptionHandler((t, e) -> {
088        throw new RuntimeException(e);
089      }).build());
090  }
091
092  /** Selects a random item from the given items */
093  public static <T> T selectRandomItem(T[] items) {
094    return items[ThreadLocalRandom.current().nextInt(items.length)];
095  }
096
097  /** Selects a random item from the given items with weights */
098  public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) {
099    int totalWeight = 0;
100    for (Pair<T, Integer> pair : items) {
101      totalWeight += pair.getSecond();
102    }
103
104    int cutoff = ThreadLocalRandom.current().nextInt(totalWeight);
105    int cummulative = 0;
106    T item = null;
107
108    // warn: O(n)
109    for (int i = 0; i < items.size(); i++) {
110      int curWeight = items.get(i).getSecond();
111      if (cutoff < cummulative + curWeight) {
112        item = items.get(i).getFirst();
113        break;
114      }
115      cummulative += curWeight;
116    }
117
118    return item;
119  }
120
121  /** Selects and returns ceil(ratio * items.length) random items from the given array */
122  public static <T> List<T> selectRandomItems(T[] items, float ratio) {
123    // clamp ratio to [0.0,1.0]
124    ratio = Math.max(Math.min(ratio, 1.0f), 0.0f);
125    final int selectedNumber = (int) Math.ceil(items.length * ratio);
126    final ReservoirSample<T> sample = new ReservoirSample<>(selectedNumber);
127    sample.add(Arrays.stream(items));
128    final List<T> shuffledItems = sample.getSamplingResult();
129    Collections.shuffle(shuffledItems);
130    return shuffledItems;
131  }
132
133  @Override
134  public void start() throws Exception {
135    final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util);
136    for (final Policy policy : policies) {
137      policy.init(context);
138      monkeyThreadPool.execute(policy);
139    }
140  }
141
142  @Override
143  public void stop(String why) {
144    // stop accepting new work (shouldn't be any with a fixed-size pool)
145    monkeyThreadPool.shutdown();
146    // notify all executing policies that it's time to halt.
147    for (Policy policy : policies) {
148      policy.stop(why);
149    }
150  }
151
152  @Override
153  public boolean isStopped() {
154    return monkeyThreadPool.isTerminated();
155  }
156
157  @Override
158  public void waitForStop() throws InterruptedException {
159    if (!monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
160      LOG.warn("Some pool threads failed to terminate. Forcing. {}", monkeyThreadPool);
161      monkeyThreadPool.shutdownNow();
162    }
163  }
164
165  @Override
166  public boolean isDestructive() {
167    // TODO: we can look at the actions, and decide to do the restore cluster or not based on them.
168    return true;
169  }
170}