001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.lang.reflect.Constructor;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ThreadLocalRandom;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HBaseInterfaceAudience;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.RegionMetrics;
035import org.apache.hadoop.hbase.ServerMetrics;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.BalancerDecision;
039import org.apache.hadoop.hbase.client.BalancerRejection;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.master.RackManager;
042import org.apache.hadoop.hbase.master.RegionPlan;
043import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
044import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
045import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.hadoop.hbase.util.ReflectionUtils;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 * <p>
055 * This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will randomly try and
056 * mutate the cluster to Cprime. If F(Cprime) &lt; F(C) then the new cluster state becomes the plan.
057 * It includes costs functions to compute the cost of:
058 * </p>
059 * <ul>
060 * <li>Region Load</li>
061 * <li>Table Load</li>
062 * <li>Data Locality</li>
063 * <li>Memstore Sizes</li>
064 * <li>Storefile Sizes</li>
065 * </ul>
066 * <p>
067 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best
068 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are
069 * scaled by their respective multipliers:
070 * </p>
071 * <ul>
072 * <li>hbase.master.balancer.stochastic.regionLoadCost</li>
073 * <li>hbase.master.balancer.stochastic.moveCost</li>
074 * <li>hbase.master.balancer.stochastic.tableLoadCost</li>
075 * <li>hbase.master.balancer.stochastic.localityCost</li>
076 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li>
077 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
078 * </ul>
079 * <p>
080 * You can also add custom Cost function by setting the the following configuration value:
081 * </p>
082 * <ul>
083 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
084 * </ul>
085 * <p>
086 * All custom Cost Functions needs to extends {@link CostFunction}
087 * </p>
088 * <p>
089 * In addition to the above configurations, the balancer can be tuned by the following configuration
090 * values:
091 * </p>
092 * <ul>
093 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions
094 * that can be moved in a single invocation of this balancer.</li>
095 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of
096 * regions is multiplied to try and get the number of times the balancer will mutate all
097 * servers.</li>
098 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the
099 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and
100 * the above computation.</li>
101 * </ul>
102 * <p>
103 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the
104 * balancer gets the full picture of all loads on the cluster.
105 * </p>
106 */
107@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
108public class StochasticLoadBalancer extends BaseLoadBalancer {
109
110  private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
111
112  protected static final String STEPS_PER_REGION_KEY =
113    "hbase.master.balancer.stochastic.stepsPerRegion";
114  protected static final int DEFAULT_STEPS_PER_REGION = 800;
115  protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
116  protected static final int DEFAULT_MAX_STEPS = 1000000;
117  protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps";
118  protected static final boolean DEFAULT_RUN_MAX_STEPS = false;
119  protected static final String MAX_RUNNING_TIME_KEY =
120    "hbase.master.balancer.stochastic.maxRunningTime";
121  protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds.
122  protected static final String KEEP_REGION_LOADS =
123    "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
124  protected static final int DEFAULT_KEEP_REGION_LOADS = 15;
125  private static final String TABLE_FUNCTION_SEP = "_";
126  protected static final String MIN_COST_NEED_BALANCE_KEY =
127    "hbase.master.balancer.stochastic.minCostNeedBalance";
128  protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f;
129  protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
130    "hbase.master.balancer.stochastic.additionalCostFunctions";
131  public static final String OVERALL_COST_FUNCTION_NAME = "Overall";
132
133  Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>();
134
135  // values are defaults
136  private int maxSteps = DEFAULT_MAX_STEPS;
137  private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS;
138  private int stepsPerRegion = DEFAULT_STEPS_PER_REGION;
139  private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
140  private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
141  private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
142  private boolean isBalancerDecisionRecording = false;
143  private boolean isBalancerRejectionRecording = false;
144
145  protected List<CandidateGenerator> candidateGenerators;
146  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
147
148  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
149                                              // IS2_INCONSISTENT_SYNC
150
151  public enum GeneratorType {
152    RANDOM,
153    LOAD,
154    LOCALITY,
155    RACK
156  }
157
158  private double[] weightsOfGenerators;
159  // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
160  private float sumMultiplier;
161  // to save and report costs to JMX
162  private double curOverallCost = 0d;
163  private double[] tempFunctionCosts;
164  private double[] curFunctionCosts;
165
166  // Keep locality based picker and cost function to alert them
167  // when new services are offered
168  private LocalityBasedCandidateGenerator localityCandidateGenerator;
169  private ServerLocalityCostFunction localityCost;
170  private RackLocalityCostFunction rackLocalityCost;
171  private RegionReplicaHostCostFunction regionReplicaHostCostFunction;
172  private RegionReplicaRackCostFunction regionReplicaRackCostFunction;
173
174  /**
175   * Use to add balancer decision history to ring-buffer
176   */
177  NamedQueueRecorder namedQueueRecorder;
178
179  /**
180   * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its
181   * default MetricsBalancer
182   */
183  public StochasticLoadBalancer() {
184    super(new MetricsStochasticBalancer());
185  }
186
187  @RestrictedApi(explanation = "Should only be called in tests", link = "",
188      allowedOnPath = ".*/src/test/.*")
189  public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) {
190    super(metricsStochasticBalancer);
191  }
192
193  private static CostFunction createCostFunction(Class<? extends CostFunction> clazz,
194    Configuration conf) {
195    try {
196      Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class);
197      return ReflectionUtils.instantiate(clazz.getName(), ctor, conf);
198    } catch (NoSuchMethodException e) {
199      // will try construct with no parameter
200    }
201    return ReflectionUtils.newInstance(clazz);
202  }
203
204  private void loadCustomCostFunctions(Configuration conf) {
205    String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);
206
207    if (null == functionsNames) {
208      return;
209    }
210    for (String className : functionsNames) {
211      Class<? extends CostFunction> clazz;
212      try {
213        clazz = Class.forName(className).asSubclass(CostFunction.class);
214      } catch (ClassNotFoundException e) {
215        LOG.warn("Cannot load class '{}': {}", className, e.getMessage());
216        continue;
217      }
218      CostFunction func = createCostFunction(clazz, conf);
219      LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
220      costFunctions.add(func);
221    }
222  }
223
224  @RestrictedApi(explanation = "Should only be called in tests", link = "",
225      allowedOnPath = ".*/src/test/.*")
226  List<CandidateGenerator> getCandidateGenerators() {
227    return this.candidateGenerators;
228  }
229
230  protected List<CandidateGenerator> createCandidateGenerators() {
231    List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4);
232    candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator());
233    candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator());
234    candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator);
235    candidateGenerators.add(GeneratorType.RACK.ordinal(),
236      new RegionReplicaRackCandidateGenerator());
237    return candidateGenerators;
238  }
239
240  protected List<CostFunction> createCostFunctions(Configuration conf) {
241    List<CostFunction> costFunctions = new ArrayList<>();
242    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
243    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
244    addCostFunction(costFunctions, new MoveCostFunction(conf));
245    addCostFunction(costFunctions, localityCost);
246    addCostFunction(costFunctions, rackLocalityCost);
247    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
248    addCostFunction(costFunctions, regionReplicaHostCostFunction);
249    addCostFunction(costFunctions, regionReplicaRackCostFunction);
250    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
251    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
252    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
253    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
254    return costFunctions;
255  }
256
257  @Override
258  protected void loadConf(Configuration conf) {
259    super.loadConf(conf);
260    maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS);
261    stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION);
262    maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME);
263    runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS);
264
265    numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS);
266    minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE);
267    localityCandidateGenerator = new LocalityBasedCandidateGenerator();
268    localityCost = new ServerLocalityCostFunction(conf);
269    rackLocalityCost = new RackLocalityCostFunction(conf);
270
271    this.candidateGenerators = createCandidateGenerators();
272
273    regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
274    regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
275    this.costFunctions = createCostFunctions(conf);
276    loadCustomCostFunctions(conf);
277
278    curFunctionCosts = new double[costFunctions.size()];
279    tempFunctionCosts = new double[costFunctions.size()];
280
281    isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
282      BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
283    isBalancerRejectionRecording =
284      conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
285        BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
286
287    if (
288      this.namedQueueRecorder == null
289        && (isBalancerDecisionRecording || isBalancerRejectionRecording)
290    ) {
291      this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf);
292    }
293
294    LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps
295      + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable="
296      + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames())
297      + " , sum of multiplier of cost functions = " + sumMultiplier + " etc.");
298  }
299
300  @Override
301  public synchronized void updateClusterMetrics(ClusterMetrics st) {
302    super.updateClusterMetrics(st);
303    updateRegionLoad();
304
305    // update metrics size
306    try {
307      // by-table or ensemble mode
308      int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1;
309      int functionsCount = getCostFunctionNames().length;
310
311      updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall
312    } catch (Exception e) {
313      LOG.error("failed to get the size of all tables", e);
314    }
315  }
316
317  private void updateBalancerTableLoadInfo(TableName tableName,
318    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
319    RegionLocationFinder finder = null;
320    if (
321      (this.localityCost != null && this.localityCost.getMultiplier() > 0)
322        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)
323    ) {
324      finder = this.regionFinder;
325    }
326    BalancerClusterState cluster =
327      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
328
329    initCosts(cluster);
330    curOverallCost = computeCost(cluster, Double.MAX_VALUE);
331    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
332    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
333  }
334
335  @Override
336  public void
337    updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) {
338    if (isByTable) {
339      loadOfAllTable.forEach((tableName, loadOfOneTable) -> {
340        updateBalancerTableLoadInfo(tableName, loadOfOneTable);
341      });
342    } else {
343      updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME,
344        toEnsumbleTableLoad(loadOfAllTable));
345    }
346  }
347
348  /**
349   * Update the number of metrics that are reported to JMX
350   */
351  @RestrictedApi(explanation = "Should only be called in tests", link = "",
352      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
353  void updateMetricsSize(int size) {
354    if (metricsBalancer instanceof MetricsStochasticBalancer) {
355      ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size);
356    }
357  }
358
359  private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
360    regionReplicaHostCostFunction.prepare(c);
361    if (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON) {
362      return true;
363    }
364    return (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON);
365  }
366
367  @RestrictedApi(explanation = "Should only be called in tests", link = "",
368      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
369  boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
370    ClusterLoadState cs = new ClusterLoadState(cluster.clusterState);
371    if (cs.getNumServers() < MIN_SERVER_BALANCE) {
372      LOG.info(
373        "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)");
374      sendRejectionReasonToRingBuffer("The number of RegionServers " + cs.getNumServers()
375        + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null);
376      return false;
377    }
378    if (areSomeRegionReplicasColocated(cluster)) {
379      LOG.info("Running balancer because at least one server hosts replicas of the same region."
380        + " function cost={}", functionCost());
381      return true;
382    }
383
384    if (idleRegionServerExist(cluster)) {
385      LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}",
386        functionCost());
387      return true;
388    }
389
390    if (sloppyRegionServerExist(cs)) {
391      LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}",
392        functionCost());
393      return true;
394    }
395
396    double total = 0.0;
397    for (CostFunction c : costFunctions) {
398      if (!c.isNeeded()) {
399        LOG.trace("{} not needed", c.getClass().getSimpleName());
400        continue;
401      }
402      total += c.cost() * c.getMultiplier();
403    }
404
405    boolean balanced = (total / sumMultiplier < minCostNeedBalance);
406    if (balanced) {
407      if (isBalancerRejectionRecording) {
408        String reason = "";
409        if (total <= 0) {
410          reason =
411            "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
412        } else if (sumMultiplier <= 0) {
413          reason = "sumMultiplier = " + sumMultiplier + " <= 0";
414        } else if ((total / sumMultiplier) < minCostNeedBalance) {
415          reason =
416            "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = "
417              + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
418        }
419        sendRejectionReasonToRingBuffer(reason, costFunctions);
420      }
421      LOG.info(
422        "{} - skipping load balancing because weighted average imbalance={} <= "
423          + "threshold({}). If you want more aggressive balancing, either lower "
424          + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative "
425          + "multiplier(s) of the specific cost function(s). functionCost={}",
426        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier,
427        minCostNeedBalance, minCostNeedBalance, functionCost());
428    } else {
429      LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
430        isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime);
431    }
432    return !balanced;
433  }
434
435  @RestrictedApi(explanation = "Should only be called in tests", link = "",
436      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
437  BalanceAction nextAction(BalancerClusterState cluster) {
438    return getRandomGenerator().generate(cluster);
439  }
440
441  /**
442   * Select the candidate generator to use based on the cost of cost functions. The chance of
443   * selecting a candidate generator is propotional to the share of cost of all cost functions among
444   * all cost functions that benefit from it.
445   */
446  protected CandidateGenerator getRandomGenerator() {
447    double sum = 0;
448    for (int i = 0; i < weightsOfGenerators.length; i++) {
449      sum += weightsOfGenerators[i];
450      weightsOfGenerators[i] = sum;
451    }
452    if (sum == 0) {
453      return candidateGenerators.get(0);
454    }
455    for (int i = 0; i < weightsOfGenerators.length; i++) {
456      weightsOfGenerators[i] /= sum;
457    }
458    double rand = ThreadLocalRandom.current().nextDouble();
459    for (int i = 0; i < weightsOfGenerators.length; i++) {
460      if (rand <= weightsOfGenerators[i]) {
461        return candidateGenerators.get(i);
462      }
463    }
464    return candidateGenerators.get(candidateGenerators.size() - 1);
465  }
466
467  @RestrictedApi(explanation = "Should only be called in tests", link = "",
468      allowedOnPath = ".*/src/test/.*")
469  void setRackManager(RackManager rackManager) {
470    this.rackManager = rackManager;
471  }
472
473  private long calculateMaxSteps(BalancerClusterState cluster) {
474    return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers;
475  }
476
477  /**
478   * Given the cluster state this will try and approach an optimal balance. This should always
479   * approach the optimal state given enough steps.
480   */
481  @Override
482  protected List<RegionPlan> balanceTable(TableName tableName,
483    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
484    List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable);
485    if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) {
486      return plans;
487    }
488
489    if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) {
490      if (loadOfOneTable.size() <= 2) {
491        return null;
492      }
493      loadOfOneTable = new HashMap<>(loadOfOneTable);
494      loadOfOneTable.remove(masterServerName);
495    }
496
497    // On clusters with lots of HFileLinks or lots of reference files,
498    // instantiating the storefile infos can be quite expensive.
499    // Allow turning this feature off if the locality cost is not going to
500    // be used in any computations.
501    RegionLocationFinder finder = null;
502    if (
503      (this.localityCost != null && this.localityCost.getMultiplier() > 0)
504        || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)
505    ) {
506      finder = this.regionFinder;
507    }
508
509    // The clusterState that is given to this method contains the state
510    // of all the regions in the table(s) (that's true today)
511    // Keep track of servers to iterate through them.
512    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
513      rackManager, regionCacheRatioOnOldServerMap);
514
515    long startTime = EnvironmentEdgeManager.currentTime();
516
517    initCosts(cluster);
518    sumMultiplier = 0;
519    for (CostFunction c : costFunctions) {
520      if (c.isNeeded()) {
521        sumMultiplier += c.getMultiplier();
522      }
523    }
524    if (sumMultiplier <= 0) {
525      LOG.error("At least one cost function needs a multiplier > 0. For example, set "
526        + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default");
527      return null;
528    }
529
530    double currentCost = computeCost(cluster, Double.MAX_VALUE);
531    curOverallCost = currentCost;
532    System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
533    updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
534    double initCost = currentCost;
535    double newCost;
536
537    if (!needsBalance(tableName, cluster)) {
538      return null;
539    }
540
541    long computedMaxSteps;
542    if (runMaxSteps) {
543      computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster));
544    } else {
545      long calculatedMaxSteps = calculateMaxSteps(cluster);
546      computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps);
547      if (calculatedMaxSteps > maxSteps) {
548        LOG.warn(
549          "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than "
550            + "maxSteps:{}. Hence load balancing may not work well. Setting parameter "
551            + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue."
552            + "(This config change does not require service restart)",
553          calculatedMaxSteps, maxSteps);
554      }
555    }
556    LOG.info(
557      "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, "
558        + "functionCost={} computedMaxSteps={}",
559      currentCost / sumMultiplier, functionCost(), computedMaxSteps);
560
561    final String initFunctionTotalCosts = totalCostsPerFunc();
562    // Perform a stochastic walk to see if we can get a good fit.
563    long step;
564
565    for (step = 0; step < computedMaxSteps; step++) {
566      BalanceAction action = nextAction(cluster);
567
568      if (action.getType() == BalanceAction.Type.NULL) {
569        continue;
570      }
571
572      cluster.doAction(action);
573      updateCostsAndWeightsWithAction(cluster, action);
574
575      newCost = computeCost(cluster, currentCost);
576
577      // Should this be kept?
578      if (newCost < currentCost) {
579        currentCost = newCost;
580
581        // save for JMX
582        curOverallCost = currentCost;
583        System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length);
584      } else {
585        // Put things back the way they were before.
586        // TODO: undo by remembering old values
587        BalanceAction undoAction = action.undoAction();
588        cluster.doAction(undoAction);
589        updateCostsAndWeightsWithAction(cluster, undoAction);
590      }
591
592      if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) {
593        break;
594      }
595    }
596    long endTime = EnvironmentEdgeManager.currentTime();
597
598    metricsBalancer.balanceCluster(endTime - startTime);
599
600    if (initCost > currentCost) {
601      updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
602      plans = createRegionPlans(cluster);
603      LOG.info(
604        "Finished computing new moving plan. Computation took {} ms"
605          + " to try {} different iterations.  Found a solution that moves "
606          + "{} regions; Going from a computed imbalance of {}"
607          + " to a new imbalance of {}. funtionCost={}",
608        endTime - startTime, step, plans.size(), initCost / sumMultiplier,
609        currentCost / sumMultiplier, functionCost());
610      sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
611      return plans;
612    }
613    LOG.info(
614      "Could not find a better moving plan.  Tried {} different configurations in "
615        + "{} ms, and did not find anything with an imbalance score less than {}",
616      step, endTime - startTime, initCost / sumMultiplier);
617    return null;
618  }
619
620  private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions) {
621    if (this.isBalancerRejectionRecording) {
622      BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason);
623      if (costFunctions != null) {
624        for (CostFunction c : costFunctions) {
625          if (!c.isNeeded()) {
626            continue;
627          }
628          builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
629        }
630      }
631      namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build()));
632    }
633  }
634
635  private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost,
636    double initCost, String initFunctionTotalCosts, long step) {
637    if (this.isBalancerDecisionRecording) {
638      List<String> regionPlans = new ArrayList<>();
639      for (RegionPlan plan : plans) {
640        regionPlans
641          .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName()
642            + " , source: " + plan.getSource() + " , destination: " + plan.getDestination());
643      }
644      BalancerDecision balancerDecision = new BalancerDecision.Builder().setInitTotalCost(initCost)
645        .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost)
646        .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step)
647        .setRegionPlans(regionPlans).build();
648      namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision));
649    }
650  }
651
652  /**
653   * update costs to JMX
654   */
655  private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) {
656    if (tableName == null) {
657      return;
658    }
659
660    // check if the metricsBalancer is MetricsStochasticBalancer before casting
661    if (metricsBalancer instanceof MetricsStochasticBalancer) {
662      MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer;
663      // overall cost
664      balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME,
665        "Overall cost", overall);
666
667      // each cost function
668      for (int i = 0; i < costFunctions.size(); i++) {
669        CostFunction costFunction = costFunctions.get(i);
670        String costFunctionName = costFunction.getClass().getSimpleName();
671        double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
672        // TODO: cost function may need a specific description
673        balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName,
674          "The percent of " + costFunctionName, costPercent);
675      }
676    }
677  }
678
679  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
680    float multiplier = costFunction.getMultiplier();
681    if (multiplier > 0) {
682      costFunctions.add(costFunction);
683    }
684  }
685
686  protected String functionCost() {
687    StringBuilder builder = new StringBuilder();
688    for (CostFunction c : costFunctions) {
689      builder.append(c.getClass().getSimpleName());
690      builder.append(" : (");
691      if (c.isNeeded()) {
692        builder.append("multiplier=" + c.getMultiplier());
693        builder.append(", ");
694        double cost = c.cost();
695        builder.append("imbalance=" + cost);
696        if (cost >= minCostNeedBalance) {
697          builder.append(", need balance");
698        }
699      } else {
700        builder.append("not needed");
701      }
702      builder.append("); ");
703    }
704    return builder.toString();
705  }
706
707  @RestrictedApi(explanation = "Should only be called in tests", link = "",
708      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
709  List<CostFunction> getCostFunctions() {
710    return costFunctions;
711  }
712
713  private String totalCostsPerFunc() {
714    StringBuilder builder = new StringBuilder();
715    for (CostFunction c : costFunctions) {
716      if (!c.isNeeded()) {
717        continue;
718      }
719      double cost = c.getMultiplier() * c.cost();
720      if (cost > 0.0) {
721        builder.append(" ");
722        builder.append(c.getClass().getSimpleName());
723        builder.append(" : ");
724        builder.append(cost);
725        builder.append(";");
726      }
727    }
728    if (builder.length() > 0) {
729      builder.deleteCharAt(builder.length() - 1);
730    }
731    return builder.toString();
732  }
733
734  /**
735   * Create all of the RegionPlan's needed to move from the initial cluster state to the desired
736   * state.
737   * @param cluster The state of the cluster
738   * @return List of RegionPlan's that represent the moves needed to get to desired final state.
739   */
740  private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) {
741    List<RegionPlan> plans = new ArrayList<>();
742    for (int regionIndex = 0; regionIndex
743        < cluster.regionIndexToServerIndex.length; regionIndex++) {
744      int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
745      int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
746
747      if (initialServerIndex != newServerIndex) {
748        RegionInfo region = cluster.regions[regionIndex];
749        ServerName initialServer = cluster.servers[initialServerIndex];
750        ServerName newServer = cluster.servers[newServerIndex];
751
752        if (LOG.isTraceEnabled()) {
753          LOG.trace("Moving Region " + region.getEncodedName() + " from server "
754            + initialServer.getHostname() + " to " + newServer.getHostname());
755        }
756        RegionPlan rp = new RegionPlan(region, initialServer, newServer);
757        plans.add(rp);
758      }
759    }
760    return plans;
761  }
762
763  /**
764   * Store the current region loads.
765   */
766  private void updateRegionLoad() {
767    // We create a new hashmap so that regions that are no longer there are removed.
768    // However we temporarily need the old loads so we can use them to keep the rolling average.
769    Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
770    loads = new HashMap<>();
771
772    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
773      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
774        String regionNameAsString = RegionInfo.getRegionNameAsString(regionName);
775        Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString);
776        if (rLoads == null) {
777          rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1);
778        } else if (rLoads.size() >= numRegionLoadsToRemember) {
779          rLoads.remove();
780        }
781        rLoads.add(new BalancerRegionLoad(rm));
782        loads.put(regionNameAsString, rLoads);
783      });
784    });
785  }
786
787  @RestrictedApi(explanation = "Should only be called in tests", link = "",
788      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
789  void initCosts(BalancerClusterState cluster) {
790    // Initialize the weights of generator every time
791    weightsOfGenerators = new double[this.candidateGenerators.size()];
792    for (CostFunction c : costFunctions) {
793      c.prepare(cluster);
794      c.updateWeight(weightsOfGenerators);
795    }
796  }
797
798  @RestrictedApi(explanation = "Should only be called in tests", link = "",
799      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
800  void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) {
801    // Reset all the weights to 0
802    for (int i = 0; i < weightsOfGenerators.length; i++) {
803      weightsOfGenerators[i] = 0;
804    }
805    for (CostFunction c : costFunctions) {
806      if (c.isNeeded()) {
807        c.postAction(action);
808        c.updateWeight(weightsOfGenerators);
809      }
810    }
811  }
812
813  /**
814   * Get the names of the cost functions
815   */
816  @RestrictedApi(explanation = "Should only be called in tests", link = "",
817      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
818  String[] getCostFunctionNames() {
819    String[] ret = new String[costFunctions.size()];
820    for (int i = 0; i < costFunctions.size(); i++) {
821      CostFunction c = costFunctions.get(i);
822      ret[i] = c.getClass().getSimpleName();
823    }
824
825    return ret;
826  }
827
828  /**
829   * This is the main cost function. It will compute a cost associated with a proposed cluster
830   * state. All different costs will be combined with their multipliers to produce a double cost.
831   * @param cluster      The state of the cluster
832   * @param previousCost the previous cost. This is used as an early out.
833   * @return a double of a cost associated with the proposed cluster state. This cost is an
834   *         aggregate of all individual cost functions.
835   */
836  @RestrictedApi(explanation = "Should only be called in tests", link = "",
837      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
838  double computeCost(BalancerClusterState cluster, double previousCost) {
839    double total = 0;
840
841    for (int i = 0; i < costFunctions.size(); i++) {
842      CostFunction c = costFunctions.get(i);
843      this.tempFunctionCosts[i] = 0.0;
844
845      if (!c.isNeeded()) {
846        continue;
847      }
848
849      Float multiplier = c.getMultiplier();
850      double cost = c.cost();
851
852      this.tempFunctionCosts[i] = multiplier * cost;
853      total += this.tempFunctionCosts[i];
854
855      if (total > previousCost) {
856        break;
857      }
858    }
859
860    return total;
861  }
862
863  /**
864   * A helper function to compose the attribute name from tablename and costfunction name
865   */
866  static String composeAttributeName(String tableName, String costFunctionName) {
867    return tableName + TABLE_FUNCTION_SEP + costFunctionName;
868  }
869}