001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.lang.reflect.Constructor; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ThreadLocalRandom; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HBaseInterfaceAudience; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.RegionMetrics; 035import org.apache.hadoop.hbase.ServerMetrics; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.BalancerDecision; 039import org.apache.hadoop.hbase.client.BalancerRejection; 040import org.apache.hadoop.hbase.client.RegionInfo; 041import org.apache.hadoop.hbase.master.RackManager; 042import org.apache.hadoop.hbase.master.RegionPlan; 043import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; 044import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; 045import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.hadoop.hbase.util.ReflectionUtils; 049import org.apache.yetus.audience.InterfaceAudience; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * <p> 055 * This is a best effort load balancer. Given a Cost function F(C) => x It will randomly try and 056 * mutate the cluster to Cprime. If F(Cprime) < F(C) then the new cluster state becomes the plan. 057 * It includes costs functions to compute the cost of: 058 * </p> 059 * <ul> 060 * <li>Region Load</li> 061 * <li>Table Load</li> 062 * <li>Data Locality</li> 063 * <li>Memstore Sizes</li> 064 * <li>Storefile Sizes</li> 065 * </ul> 066 * <p> 067 * Every cost function returns a number between 0 and 1 inclusive; where 0 is the lowest cost best 068 * solution, and 1 is the highest possible cost and the worst solution. The computed costs are 069 * scaled by their respective multipliers: 070 * </p> 071 * <ul> 072 * <li>hbase.master.balancer.stochastic.regionLoadCost</li> 073 * <li>hbase.master.balancer.stochastic.moveCost</li> 074 * <li>hbase.master.balancer.stochastic.tableLoadCost</li> 075 * <li>hbase.master.balancer.stochastic.localityCost</li> 076 * <li>hbase.master.balancer.stochastic.memstoreSizeCost</li> 077 * <li>hbase.master.balancer.stochastic.storefileSizeCost</li> 078 * </ul> 079 * <p> 080 * You can also add custom Cost function by setting the the following configuration value: 081 * </p> 082 * <ul> 083 * <li>hbase.master.balancer.stochastic.additionalCostFunctions</li> 084 * </ul> 085 * <p> 086 * All custom Cost Functions needs to extends {@link CostFunction} 087 * </p> 088 * <p> 089 * In addition to the above configurations, the balancer can be tuned by the following configuration 090 * values: 091 * </p> 092 * <ul> 093 * <li>hbase.master.balancer.stochastic.maxMoveRegions which controls what the max number of regions 094 * that can be moved in a single invocation of this balancer.</li> 095 * <li>hbase.master.balancer.stochastic.stepsPerRegion is the coefficient by which the number of 096 * regions is multiplied to try and get the number of times the balancer will mutate all 097 * servers.</li> 098 * <li>hbase.master.balancer.stochastic.maxSteps which controls the maximum number of times that the 099 * balancer will try and mutate all the servers. The balancer will use the minimum of this value and 100 * the above computation.</li> 101 * </ul> 102 * <p> 103 * This balancer is best used with hbase.master.loadbalance.bytable set to false so that the 104 * balancer gets the full picture of all loads on the cluster. 105 * </p> 106 */ 107@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 108public class StochasticLoadBalancer extends BaseLoadBalancer { 109 110 private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class); 111 112 protected static final String STEPS_PER_REGION_KEY = 113 "hbase.master.balancer.stochastic.stepsPerRegion"; 114 protected static final int DEFAULT_STEPS_PER_REGION = 800; 115 protected static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps"; 116 protected static final int DEFAULT_MAX_STEPS = 1000000; 117 protected static final String RUN_MAX_STEPS_KEY = "hbase.master.balancer.stochastic.runMaxSteps"; 118 protected static final boolean DEFAULT_RUN_MAX_STEPS = false; 119 protected static final String MAX_RUNNING_TIME_KEY = 120 "hbase.master.balancer.stochastic.maxRunningTime"; 121 protected static final long DEFAULT_MAX_RUNNING_TIME = 30 * 1000; // 30 seconds. 122 protected static final String KEEP_REGION_LOADS = 123 "hbase.master.balancer.stochastic.numRegionLoadsToRemember"; 124 protected static final int DEFAULT_KEEP_REGION_LOADS = 15; 125 private static final String TABLE_FUNCTION_SEP = "_"; 126 protected static final String MIN_COST_NEED_BALANCE_KEY = 127 "hbase.master.balancer.stochastic.minCostNeedBalance"; 128 protected static final float DEFAULT_MIN_COST_NEED_BALANCE = 0.025f; 129 protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY = 130 "hbase.master.balancer.stochastic.additionalCostFunctions"; 131 public static final String OVERALL_COST_FUNCTION_NAME = "Overall"; 132 133 Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<>(); 134 135 // values are defaults 136 private int maxSteps = DEFAULT_MAX_STEPS; 137 private boolean runMaxSteps = DEFAULT_RUN_MAX_STEPS; 138 private int stepsPerRegion = DEFAULT_STEPS_PER_REGION; 139 private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME; 140 private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS; 141 private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE; 142 private boolean isBalancerDecisionRecording = false; 143 private boolean isBalancerRejectionRecording = false; 144 145 protected List<CandidateGenerator> candidateGenerators; 146 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>(); 147 148 protected List<CostFunction> costFunctions; // FindBugs: Wants this protected; 149 // IS2_INCONSISTENT_SYNC 150 151 public enum GeneratorType { 152 RANDOM, 153 LOAD, 154 LOCALITY, 155 RACK 156 } 157 158 private double[] weightsOfGenerators; 159 // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost 160 private float sumMultiplier; 161 // to save and report costs to JMX 162 private double curOverallCost = 0d; 163 private double[] tempFunctionCosts; 164 private double[] curFunctionCosts; 165 166 // Keep locality based picker and cost function to alert them 167 // when new services are offered 168 private LocalityBasedCandidateGenerator localityCandidateGenerator; 169 private ServerLocalityCostFunction localityCost; 170 private RackLocalityCostFunction rackLocalityCost; 171 private RegionReplicaHostCostFunction regionReplicaHostCostFunction; 172 private RegionReplicaRackCostFunction regionReplicaRackCostFunction; 173 174 /** 175 * Use to add balancer decision history to ring-buffer 176 */ 177 NamedQueueRecorder namedQueueRecorder; 178 179 /** 180 * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its 181 * default MetricsBalancer 182 */ 183 public StochasticLoadBalancer() { 184 super(new MetricsStochasticBalancer()); 185 } 186 187 @RestrictedApi(explanation = "Should only be called in tests", link = "", 188 allowedOnPath = ".*/src/test/.*") 189 public StochasticLoadBalancer(MetricsStochasticBalancer metricsStochasticBalancer) { 190 super(metricsStochasticBalancer); 191 } 192 193 private static CostFunction createCostFunction(Class<? extends CostFunction> clazz, 194 Configuration conf) { 195 try { 196 Constructor<? extends CostFunction> ctor = clazz.getDeclaredConstructor(Configuration.class); 197 return ReflectionUtils.instantiate(clazz.getName(), ctor, conf); 198 } catch (NoSuchMethodException e) { 199 // will try construct with no parameter 200 } 201 return ReflectionUtils.newInstance(clazz); 202 } 203 204 private void loadCustomCostFunctions(Configuration conf) { 205 String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY); 206 207 if (null == functionsNames) { 208 return; 209 } 210 for (String className : functionsNames) { 211 Class<? extends CostFunction> clazz; 212 try { 213 clazz = Class.forName(className).asSubclass(CostFunction.class); 214 } catch (ClassNotFoundException e) { 215 LOG.warn("Cannot load class '{}': {}", className, e.getMessage()); 216 continue; 217 } 218 CostFunction func = createCostFunction(clazz, conf); 219 LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName()); 220 costFunctions.add(func); 221 } 222 } 223 224 @RestrictedApi(explanation = "Should only be called in tests", link = "", 225 allowedOnPath = ".*/src/test/.*") 226 List<CandidateGenerator> getCandidateGenerators() { 227 return this.candidateGenerators; 228 } 229 230 protected List<CandidateGenerator> createCandidateGenerators() { 231 List<CandidateGenerator> candidateGenerators = new ArrayList<CandidateGenerator>(4); 232 candidateGenerators.add(GeneratorType.RANDOM.ordinal(), new RandomCandidateGenerator()); 233 candidateGenerators.add(GeneratorType.LOAD.ordinal(), new LoadCandidateGenerator()); 234 candidateGenerators.add(GeneratorType.LOCALITY.ordinal(), localityCandidateGenerator); 235 candidateGenerators.add(GeneratorType.RACK.ordinal(), 236 new RegionReplicaRackCandidateGenerator()); 237 return candidateGenerators; 238 } 239 240 protected List<CostFunction> createCostFunctions(Configuration conf) { 241 List<CostFunction> costFunctions = new ArrayList<>(); 242 addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf)); 243 addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf)); 244 addCostFunction(costFunctions, new MoveCostFunction(conf)); 245 addCostFunction(costFunctions, localityCost); 246 addCostFunction(costFunctions, rackLocalityCost); 247 addCostFunction(costFunctions, new TableSkewCostFunction(conf)); 248 addCostFunction(costFunctions, regionReplicaHostCostFunction); 249 addCostFunction(costFunctions, regionReplicaRackCostFunction); 250 addCostFunction(costFunctions, new ReadRequestCostFunction(conf)); 251 addCostFunction(costFunctions, new WriteRequestCostFunction(conf)); 252 addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf)); 253 addCostFunction(costFunctions, new StoreFileCostFunction(conf)); 254 return costFunctions; 255 } 256 257 @Override 258 protected void loadConf(Configuration conf) { 259 super.loadConf(conf); 260 maxSteps = conf.getInt(MAX_STEPS_KEY, DEFAULT_MAX_STEPS); 261 stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, DEFAULT_STEPS_PER_REGION); 262 maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, DEFAULT_MAX_RUNNING_TIME); 263 runMaxSteps = conf.getBoolean(RUN_MAX_STEPS_KEY, DEFAULT_RUN_MAX_STEPS); 264 265 numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, DEFAULT_KEEP_REGION_LOADS); 266 minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, DEFAULT_MIN_COST_NEED_BALANCE); 267 localityCandidateGenerator = new LocalityBasedCandidateGenerator(); 268 localityCost = new ServerLocalityCostFunction(conf); 269 rackLocalityCost = new RackLocalityCostFunction(conf); 270 271 this.candidateGenerators = createCandidateGenerators(); 272 273 regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); 274 regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf); 275 this.costFunctions = createCostFunctions(conf); 276 loadCustomCostFunctions(conf); 277 278 curFunctionCosts = new double[costFunctions.size()]; 279 tempFunctionCosts = new double[costFunctions.size()]; 280 281 isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 282 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 283 isBalancerRejectionRecording = 284 conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, 285 BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED); 286 287 if ( 288 this.namedQueueRecorder == null 289 && (isBalancerDecisionRecording || isBalancerRejectionRecording) 290 ) { 291 this.namedQueueRecorder = NamedQueueRecorder.getInstance(conf); 292 } 293 294 LOG.info("Loaded config; maxSteps=" + maxSteps + ", runMaxSteps=" + runMaxSteps 295 + ", stepsPerRegion=" + stepsPerRegion + ", maxRunningTime=" + maxRunningTime + ", isByTable=" 296 + isByTable + ", CostFunctions=" + Arrays.toString(getCostFunctionNames()) 297 + " , sum of multiplier of cost functions = " + sumMultiplier + " etc."); 298 } 299 300 @Override 301 public synchronized void updateClusterMetrics(ClusterMetrics st) { 302 super.updateClusterMetrics(st); 303 updateRegionLoad(); 304 305 // update metrics size 306 try { 307 // by-table or ensemble mode 308 int tablesCount = isByTable ? services.getTableDescriptors().getAll().size() : 1; 309 int functionsCount = getCostFunctionNames().length; 310 311 updateMetricsSize(tablesCount * (functionsCount + 1)); // +1 for overall 312 } catch (Exception e) { 313 LOG.error("failed to get the size of all tables", e); 314 } 315 } 316 317 private void updateBalancerTableLoadInfo(TableName tableName, 318 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 319 RegionLocationFinder finder = null; 320 if ( 321 (this.localityCost != null && this.localityCost.getMultiplier() > 0) 322 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) 323 ) { 324 finder = this.regionFinder; 325 } 326 BalancerClusterState cluster = 327 new BalancerClusterState(loadOfOneTable, loads, finder, rackManager); 328 329 initCosts(cluster); 330 curOverallCost = computeCost(cluster, Double.MAX_VALUE); 331 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 332 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 333 } 334 335 @Override 336 public void 337 updateBalancerLoadInfo(Map<TableName, Map<ServerName, List<RegionInfo>>> loadOfAllTable) { 338 if (isByTable) { 339 loadOfAllTable.forEach((tableName, loadOfOneTable) -> { 340 updateBalancerTableLoadInfo(tableName, loadOfOneTable); 341 }); 342 } else { 343 updateBalancerTableLoadInfo(HConstants.ENSEMBLE_TABLE_NAME, 344 toEnsumbleTableLoad(loadOfAllTable)); 345 } 346 } 347 348 /** 349 * Update the number of metrics that are reported to JMX 350 */ 351 @RestrictedApi(explanation = "Should only be called in tests", link = "", 352 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 353 void updateMetricsSize(int size) { 354 if (metricsBalancer instanceof MetricsStochasticBalancer) { 355 ((MetricsStochasticBalancer) metricsBalancer).updateMetricsSize(size); 356 } 357 } 358 359 private boolean areSomeRegionReplicasColocated(BalancerClusterState c) { 360 regionReplicaHostCostFunction.prepare(c); 361 if (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON) { 362 return true; 363 } 364 return (Math.abs(regionReplicaHostCostFunction.cost()) > CostFunction.COST_EPSILON); 365 } 366 367 @RestrictedApi(explanation = "Should only be called in tests", link = "", 368 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 369 boolean needsBalance(TableName tableName, BalancerClusterState cluster) { 370 ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); 371 if (cs.getNumServers() < MIN_SERVER_BALANCE) { 372 LOG.info( 373 "Not running balancer because only " + cs.getNumServers() + " active regionserver(s)"); 374 sendRejectionReasonToRingBuffer("The number of RegionServers " + cs.getNumServers() 375 + " < MIN_SERVER_BALANCE(" + MIN_SERVER_BALANCE + ")", null); 376 return false; 377 } 378 if (areSomeRegionReplicasColocated(cluster)) { 379 LOG.info("Running balancer because at least one server hosts replicas of the same region." 380 + " function cost={}", functionCost()); 381 return true; 382 } 383 384 if (idleRegionServerExist(cluster)) { 385 LOG.info("Running balancer because cluster has idle server(s)." + " function cost={}", 386 functionCost()); 387 return true; 388 } 389 390 if (sloppyRegionServerExist(cs)) { 391 LOG.info("Running balancer because cluster has sloppy server(s)." + " function cost={}", 392 functionCost()); 393 return true; 394 } 395 396 double total = 0.0; 397 for (CostFunction c : costFunctions) { 398 if (!c.isNeeded()) { 399 LOG.trace("{} not needed", c.getClass().getSimpleName()); 400 continue; 401 } 402 total += c.cost() * c.getMultiplier(); 403 } 404 405 boolean balanced = (total / sumMultiplier < minCostNeedBalance); 406 if (balanced) { 407 if (isBalancerRejectionRecording) { 408 String reason = ""; 409 if (total <= 0) { 410 reason = 411 "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0"; 412 } else if (sumMultiplier <= 0) { 413 reason = "sumMultiplier = " + sumMultiplier + " <= 0"; 414 } else if ((total / sumMultiplier) < minCostNeedBalance) { 415 reason = 416 "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " 417 + (total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")"; 418 } 419 sendRejectionReasonToRingBuffer(reason, costFunctions); 420 } 421 LOG.info( 422 "{} - skipping load balancing because weighted average imbalance={} <= " 423 + "threshold({}). If you want more aggressive balancing, either lower " 424 + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " 425 + "multiplier(s) of the specific cost function(s). functionCost={}", 426 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, 427 minCostNeedBalance, minCostNeedBalance, functionCost()); 428 } else { 429 LOG.info("{} - Calculating plan. may take up to {}ms to complete.", 430 isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); 431 } 432 return !balanced; 433 } 434 435 @RestrictedApi(explanation = "Should only be called in tests", link = "", 436 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 437 BalanceAction nextAction(BalancerClusterState cluster) { 438 return getRandomGenerator().generate(cluster); 439 } 440 441 /** 442 * Select the candidate generator to use based on the cost of cost functions. The chance of 443 * selecting a candidate generator is propotional to the share of cost of all cost functions among 444 * all cost functions that benefit from it. 445 */ 446 protected CandidateGenerator getRandomGenerator() { 447 double sum = 0; 448 for (int i = 0; i < weightsOfGenerators.length; i++) { 449 sum += weightsOfGenerators[i]; 450 weightsOfGenerators[i] = sum; 451 } 452 if (sum == 0) { 453 return candidateGenerators.get(0); 454 } 455 for (int i = 0; i < weightsOfGenerators.length; i++) { 456 weightsOfGenerators[i] /= sum; 457 } 458 double rand = ThreadLocalRandom.current().nextDouble(); 459 for (int i = 0; i < weightsOfGenerators.length; i++) { 460 if (rand <= weightsOfGenerators[i]) { 461 return candidateGenerators.get(i); 462 } 463 } 464 return candidateGenerators.get(candidateGenerators.size() - 1); 465 } 466 467 @RestrictedApi(explanation = "Should only be called in tests", link = "", 468 allowedOnPath = ".*/src/test/.*") 469 void setRackManager(RackManager rackManager) { 470 this.rackManager = rackManager; 471 } 472 473 private long calculateMaxSteps(BalancerClusterState cluster) { 474 return (long) cluster.numRegions * (long) this.stepsPerRegion * (long) cluster.numServers; 475 } 476 477 /** 478 * Given the cluster state this will try and approach an optimal balance. This should always 479 * approach the optimal state given enough steps. 480 */ 481 @Override 482 protected List<RegionPlan> balanceTable(TableName tableName, 483 Map<ServerName, List<RegionInfo>> loadOfOneTable) { 484 List<RegionPlan> plans = balanceMasterRegions(loadOfOneTable); 485 if (plans != null || loadOfOneTable == null || loadOfOneTable.size() <= 1) { 486 return plans; 487 } 488 489 if (masterServerName != null && loadOfOneTable.containsKey(masterServerName)) { 490 if (loadOfOneTable.size() <= 2) { 491 return null; 492 } 493 loadOfOneTable = new HashMap<>(loadOfOneTable); 494 loadOfOneTable.remove(masterServerName); 495 } 496 497 // On clusters with lots of HFileLinks or lots of reference files, 498 // instantiating the storefile infos can be quite expensive. 499 // Allow turning this feature off if the locality cost is not going to 500 // be used in any computations. 501 RegionLocationFinder finder = null; 502 if ( 503 (this.localityCost != null && this.localityCost.getMultiplier() > 0) 504 || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) 505 ) { 506 finder = this.regionFinder; 507 } 508 509 // The clusterState that is given to this method contains the state 510 // of all the regions in the table(s) (that's true today) 511 // Keep track of servers to iterate through them. 512 BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder, 513 rackManager, regionCacheRatioOnOldServerMap); 514 515 long startTime = EnvironmentEdgeManager.currentTime(); 516 517 initCosts(cluster); 518 sumMultiplier = 0; 519 for (CostFunction c : costFunctions) { 520 if (c.isNeeded()) { 521 sumMultiplier += c.getMultiplier(); 522 } 523 } 524 if (sumMultiplier <= 0) { 525 LOG.error("At least one cost function needs a multiplier > 0. For example, set " 526 + "hbase.master.balancer.stochastic.regionCountCost to a positive value or default"); 527 return null; 528 } 529 530 double currentCost = computeCost(cluster, Double.MAX_VALUE); 531 curOverallCost = currentCost; 532 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 533 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 534 double initCost = currentCost; 535 double newCost; 536 537 if (!needsBalance(tableName, cluster)) { 538 return null; 539 } 540 541 long computedMaxSteps; 542 if (runMaxSteps) { 543 computedMaxSteps = Math.max(this.maxSteps, calculateMaxSteps(cluster)); 544 } else { 545 long calculatedMaxSteps = calculateMaxSteps(cluster); 546 computedMaxSteps = Math.min(this.maxSteps, calculatedMaxSteps); 547 if (calculatedMaxSteps > maxSteps) { 548 LOG.warn( 549 "calculatedMaxSteps:{} for loadbalancer's stochastic walk is larger than " 550 + "maxSteps:{}. Hence load balancing may not work well. Setting parameter " 551 + "\"hbase.master.balancer.stochastic.runMaxSteps\" to true can overcome this issue." 552 + "(This config change does not require service restart)", 553 calculatedMaxSteps, maxSteps); 554 } 555 } 556 LOG.info( 557 "Start StochasticLoadBalancer.balancer, initial weighted average imbalance={}, " 558 + "functionCost={} computedMaxSteps={}", 559 currentCost / sumMultiplier, functionCost(), computedMaxSteps); 560 561 final String initFunctionTotalCosts = totalCostsPerFunc(); 562 // Perform a stochastic walk to see if we can get a good fit. 563 long step; 564 565 for (step = 0; step < computedMaxSteps; step++) { 566 BalanceAction action = nextAction(cluster); 567 568 if (action.getType() == BalanceAction.Type.NULL) { 569 continue; 570 } 571 572 cluster.doAction(action); 573 updateCostsAndWeightsWithAction(cluster, action); 574 575 newCost = computeCost(cluster, currentCost); 576 577 // Should this be kept? 578 if (newCost < currentCost) { 579 currentCost = newCost; 580 581 // save for JMX 582 curOverallCost = currentCost; 583 System.arraycopy(tempFunctionCosts, 0, curFunctionCosts, 0, curFunctionCosts.length); 584 } else { 585 // Put things back the way they were before. 586 // TODO: undo by remembering old values 587 BalanceAction undoAction = action.undoAction(); 588 cluster.doAction(undoAction); 589 updateCostsAndWeightsWithAction(cluster, undoAction); 590 } 591 592 if (EnvironmentEdgeManager.currentTime() - startTime > maxRunningTime) { 593 break; 594 } 595 } 596 long endTime = EnvironmentEdgeManager.currentTime(); 597 598 metricsBalancer.balanceCluster(endTime - startTime); 599 600 if (initCost > currentCost) { 601 updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); 602 plans = createRegionPlans(cluster); 603 LOG.info( 604 "Finished computing new moving plan. Computation took {} ms" 605 + " to try {} different iterations. Found a solution that moves " 606 + "{} regions; Going from a computed imbalance of {}" 607 + " to a new imbalance of {}. funtionCost={}", 608 endTime - startTime, step, plans.size(), initCost / sumMultiplier, 609 currentCost / sumMultiplier, functionCost()); 610 sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step); 611 return plans; 612 } 613 LOG.info( 614 "Could not find a better moving plan. Tried {} different configurations in " 615 + "{} ms, and did not find anything with an imbalance score less than {}", 616 step, endTime - startTime, initCost / sumMultiplier); 617 return null; 618 } 619 620 private void sendRejectionReasonToRingBuffer(String reason, List<CostFunction> costFunctions) { 621 if (this.isBalancerRejectionRecording) { 622 BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason); 623 if (costFunctions != null) { 624 for (CostFunction c : costFunctions) { 625 if (!c.isNeeded()) { 626 continue; 627 } 628 builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier()); 629 } 630 } 631 namedQueueRecorder.addRecord(new BalancerRejectionDetails(builder.build())); 632 } 633 } 634 635 private void sendRegionPlansToRingBuffer(List<RegionPlan> plans, double currentCost, 636 double initCost, String initFunctionTotalCosts, long step) { 637 if (this.isBalancerDecisionRecording) { 638 List<String> regionPlans = new ArrayList<>(); 639 for (RegionPlan plan : plans) { 640 regionPlans 641 .add("table: " + plan.getRegionInfo().getTable() + " , region: " + plan.getRegionName() 642 + " , source: " + plan.getSource() + " , destination: " + plan.getDestination()); 643 } 644 BalancerDecision balancerDecision = new BalancerDecision.Builder().setInitTotalCost(initCost) 645 .setInitialFunctionCosts(initFunctionTotalCosts).setComputedTotalCost(currentCost) 646 .setFinalFunctionCosts(totalCostsPerFunc()).setComputedSteps(step) 647 .setRegionPlans(regionPlans).build(); 648 namedQueueRecorder.addRecord(new BalancerDecisionDetails(balancerDecision)); 649 } 650 } 651 652 /** 653 * update costs to JMX 654 */ 655 private void updateStochasticCosts(TableName tableName, double overall, double[] subCosts) { 656 if (tableName == null) { 657 return; 658 } 659 660 // check if the metricsBalancer is MetricsStochasticBalancer before casting 661 if (metricsBalancer instanceof MetricsStochasticBalancer) { 662 MetricsStochasticBalancer balancer = (MetricsStochasticBalancer) metricsBalancer; 663 // overall cost 664 balancer.updateStochasticCost(tableName.getNameAsString(), OVERALL_COST_FUNCTION_NAME, 665 "Overall cost", overall); 666 667 // each cost function 668 for (int i = 0; i < costFunctions.size(); i++) { 669 CostFunction costFunction = costFunctions.get(i); 670 String costFunctionName = costFunction.getClass().getSimpleName(); 671 double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall); 672 // TODO: cost function may need a specific description 673 balancer.updateStochasticCost(tableName.getNameAsString(), costFunctionName, 674 "The percent of " + costFunctionName, costPercent); 675 } 676 } 677 } 678 679 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 680 float multiplier = costFunction.getMultiplier(); 681 if (multiplier > 0) { 682 costFunctions.add(costFunction); 683 } 684 } 685 686 protected String functionCost() { 687 StringBuilder builder = new StringBuilder(); 688 for (CostFunction c : costFunctions) { 689 builder.append(c.getClass().getSimpleName()); 690 builder.append(" : ("); 691 if (c.isNeeded()) { 692 builder.append("multiplier=" + c.getMultiplier()); 693 builder.append(", "); 694 double cost = c.cost(); 695 builder.append("imbalance=" + cost); 696 if (cost >= minCostNeedBalance) { 697 builder.append(", need balance"); 698 } 699 } else { 700 builder.append("not needed"); 701 } 702 builder.append("); "); 703 } 704 return builder.toString(); 705 } 706 707 @RestrictedApi(explanation = "Should only be called in tests", link = "", 708 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 709 List<CostFunction> getCostFunctions() { 710 return costFunctions; 711 } 712 713 private String totalCostsPerFunc() { 714 StringBuilder builder = new StringBuilder(); 715 for (CostFunction c : costFunctions) { 716 if (!c.isNeeded()) { 717 continue; 718 } 719 double cost = c.getMultiplier() * c.cost(); 720 if (cost > 0.0) { 721 builder.append(" "); 722 builder.append(c.getClass().getSimpleName()); 723 builder.append(" : "); 724 builder.append(cost); 725 builder.append(";"); 726 } 727 } 728 if (builder.length() > 0) { 729 builder.deleteCharAt(builder.length() - 1); 730 } 731 return builder.toString(); 732 } 733 734 /** 735 * Create all of the RegionPlan's needed to move from the initial cluster state to the desired 736 * state. 737 * @param cluster The state of the cluster 738 * @return List of RegionPlan's that represent the moves needed to get to desired final state. 739 */ 740 private List<RegionPlan> createRegionPlans(BalancerClusterState cluster) { 741 List<RegionPlan> plans = new ArrayList<>(); 742 for (int regionIndex = 0; regionIndex 743 < cluster.regionIndexToServerIndex.length; regionIndex++) { 744 int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex]; 745 int newServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 746 747 if (initialServerIndex != newServerIndex) { 748 RegionInfo region = cluster.regions[regionIndex]; 749 ServerName initialServer = cluster.servers[initialServerIndex]; 750 ServerName newServer = cluster.servers[newServerIndex]; 751 752 if (LOG.isTraceEnabled()) { 753 LOG.trace("Moving Region " + region.getEncodedName() + " from server " 754 + initialServer.getHostname() + " to " + newServer.getHostname()); 755 } 756 RegionPlan rp = new RegionPlan(region, initialServer, newServer); 757 plans.add(rp); 758 } 759 } 760 return plans; 761 } 762 763 /** 764 * Store the current region loads. 765 */ 766 private void updateRegionLoad() { 767 // We create a new hashmap so that regions that are no longer there are removed. 768 // However we temporarily need the old loads so we can use them to keep the rolling average. 769 Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; 770 loads = new HashMap<>(); 771 772 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 773 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 774 String regionNameAsString = RegionInfo.getRegionNameAsString(regionName); 775 Deque<BalancerRegionLoad> rLoads = oldLoads.get(regionNameAsString); 776 if (rLoads == null) { 777 rLoads = new ArrayDeque<>(numRegionLoadsToRemember + 1); 778 } else if (rLoads.size() >= numRegionLoadsToRemember) { 779 rLoads.remove(); 780 } 781 rLoads.add(new BalancerRegionLoad(rm)); 782 loads.put(regionNameAsString, rLoads); 783 }); 784 }); 785 } 786 787 @RestrictedApi(explanation = "Should only be called in tests", link = "", 788 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 789 void initCosts(BalancerClusterState cluster) { 790 // Initialize the weights of generator every time 791 weightsOfGenerators = new double[this.candidateGenerators.size()]; 792 for (CostFunction c : costFunctions) { 793 c.prepare(cluster); 794 c.updateWeight(weightsOfGenerators); 795 } 796 } 797 798 @RestrictedApi(explanation = "Should only be called in tests", link = "", 799 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 800 void updateCostsAndWeightsWithAction(BalancerClusterState cluster, BalanceAction action) { 801 // Reset all the weights to 0 802 for (int i = 0; i < weightsOfGenerators.length; i++) { 803 weightsOfGenerators[i] = 0; 804 } 805 for (CostFunction c : costFunctions) { 806 if (c.isNeeded()) { 807 c.postAction(action); 808 c.updateWeight(weightsOfGenerators); 809 } 810 } 811 } 812 813 /** 814 * Get the names of the cost functions 815 */ 816 @RestrictedApi(explanation = "Should only be called in tests", link = "", 817 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 818 String[] getCostFunctionNames() { 819 String[] ret = new String[costFunctions.size()]; 820 for (int i = 0; i < costFunctions.size(); i++) { 821 CostFunction c = costFunctions.get(i); 822 ret[i] = c.getClass().getSimpleName(); 823 } 824 825 return ret; 826 } 827 828 /** 829 * This is the main cost function. It will compute a cost associated with a proposed cluster 830 * state. All different costs will be combined with their multipliers to produce a double cost. 831 * @param cluster The state of the cluster 832 * @param previousCost the previous cost. This is used as an early out. 833 * @return a double of a cost associated with the proposed cluster state. This cost is an 834 * aggregate of all individual cost functions. 835 */ 836 @RestrictedApi(explanation = "Should only be called in tests", link = "", 837 allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") 838 double computeCost(BalancerClusterState cluster, double previousCost) { 839 double total = 0; 840 841 for (int i = 0; i < costFunctions.size(); i++) { 842 CostFunction c = costFunctions.get(i); 843 this.tempFunctionCosts[i] = 0.0; 844 845 if (!c.isNeeded()) { 846 continue; 847 } 848 849 Float multiplier = c.getMultiplier(); 850 double cost = c.cost(); 851 852 this.tempFunctionCosts[i] = multiplier * cost; 853 total += this.tempFunctionCosts[i]; 854 855 if (total > previousCost) { 856 break; 857 } 858 } 859 860 return total; 861 } 862 863 /** 864 * A helper function to compose the attribute name from tablename and costfunction name 865 */ 866 static String composeAttributeName(String tableName, String costFunctionName) { 867 return tableName + TABLE_FUNCTION_SEP + costFunctionName; 868 } 869}