001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.chaos.monkies; 019 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.List; 024import java.util.Objects; 025import java.util.Properties; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.ThreadLocalRandom; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.hbase.IntegrationTestingUtility; 031import org.apache.hadoop.hbase.chaos.policies.Policy; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.hadoop.hbase.util.ReservoirSample; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 038 039/** 040 * Chaos monkey that given multiple policies will run actions against the cluster. 041 */ 042public class PolicyBasedChaosMonkey extends ChaosMonkey { 043 private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); 044 045 private static final long ONE_SEC = 1000; 046 private static final long ONE_MIN = 60 * ONE_SEC; 047 048 public static final long TIMEOUT = ONE_MIN; 049 050 final IntegrationTestingUtility util; 051 final Properties monkeyProps; 052 053 private final Policy[] policies; 054 private final ExecutorService monkeyThreadPool; 055 056 /** 057 * Construct a new ChaosMonkey 058 * @param util the HBaseIntegrationTestingUtility already configured 059 * @param policies custom policies to use 060 */ 061 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { 062 this(null, util, policies); 063 } 064 065 public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { 066 this(null, util, policies); 067 } 068 069 public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util, 070 Collection<Policy> policies) { 071 this(monkeyProps, util, policies.toArray(new Policy[0])); 072 } 073 074 public PolicyBasedChaosMonkey(Properties monkeyProps, IntegrationTestingUtility util, 075 Policy... policies) { 076 this.monkeyProps = monkeyProps; 077 this.util = Objects.requireNonNull(util); 078 this.policies = Objects.requireNonNull(policies); 079 if (policies.length == 0) { 080 throw new IllegalArgumentException("policies may not be empty"); 081 } 082 this.monkeyThreadPool = buildMonkeyThreadPool(policies.length); 083 } 084 085 private static ExecutorService buildMonkeyThreadPool(final int size) { 086 return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setDaemon(false) 087 .setNameFormat("ChaosMonkey-%d").setUncaughtExceptionHandler((t, e) -> { 088 throw new RuntimeException(e); 089 }).build()); 090 } 091 092 /** Selects a random item from the given items */ 093 public static <T> T selectRandomItem(T[] items) { 094 return items[ThreadLocalRandom.current().nextInt(items.length)]; 095 } 096 097 /** Selects a random item from the given items with weights */ 098 public static <T> T selectWeightedRandomItem(List<Pair<T, Integer>> items) { 099 int totalWeight = 0; 100 for (Pair<T, Integer> pair : items) { 101 totalWeight += pair.getSecond(); 102 } 103 104 int cutoff = ThreadLocalRandom.current().nextInt(totalWeight); 105 int cummulative = 0; 106 T item = null; 107 108 // warn: O(n) 109 for (int i = 0; i < items.size(); i++) { 110 int curWeight = items.get(i).getSecond(); 111 if (cutoff < cummulative + curWeight) { 112 item = items.get(i).getFirst(); 113 break; 114 } 115 cummulative += curWeight; 116 } 117 118 return item; 119 } 120 121 /** Selects and returns ceil(ratio * items.length) random items from the given array */ 122 public static <T> List<T> selectRandomItems(T[] items, float ratio) { 123 // clamp ratio to [0.0,1.0] 124 ratio = Math.max(Math.min(ratio, 1.0f), 0.0f); 125 final int selectedNumber = (int) Math.ceil(items.length * ratio); 126 final ReservoirSample<T> sample = new ReservoirSample<>(selectedNumber); 127 sample.add(Arrays.stream(items)); 128 final List<T> shuffledItems = sample.getSamplingResult(); 129 Collections.shuffle(shuffledItems); 130 return shuffledItems; 131 } 132 133 @Override 134 public void start() throws Exception { 135 final Policy.PolicyContext context = new Policy.PolicyContext(monkeyProps, util); 136 for (final Policy policy : policies) { 137 policy.init(context); 138 monkeyThreadPool.execute(policy); 139 } 140 } 141 142 @Override 143 public void stop(String why) { 144 // stop accepting new work (shouldn't be any with a fixed-size pool) 145 monkeyThreadPool.shutdown(); 146 // notify all executing policies that it's time to halt. 147 for (Policy policy : policies) { 148 policy.stop(why); 149 } 150 } 151 152 @Override 153 public boolean isStopped() { 154 return monkeyThreadPool.isTerminated(); 155 } 156 157 @Override 158 public void waitForStop() throws InterruptedException { 159 if (!monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { 160 LOG.warn("Some pool threads failed to terminate. Forcing. {}", monkeyThreadPool); 161 monkeyThreadPool.shutdownNow(); 162 } 163 } 164 165 @Override 166 public boolean isDestructive() { 167 // TODO: we can look at the actions, and decide to do the restore cluster or not based on them. 168 return true; 169 } 170}