001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions 021 * based on the amount they are cached on a given server. A region can move across the region 022 * servers whenever a region server shuts down or crashes. The region server preserves the cache 023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism 024 * where it maintains the amount by which a region is cached on a region server. During balancer 025 * run, a region plan is generated that takes into account this cache information and tries to 026 * move the regions so that the cache minimally impacted. 027 */ 028 029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY; 030 031import java.util.ArrayDeque; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Deque; 035import java.util.HashMap; 036import java.util.List; 037import java.util.Map; 038import java.util.Optional; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.ClusterMetrics; 041import org.apache.hadoop.hbase.RegionMetrics; 042import org.apache.hadoop.hbase.ServerMetrics; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.Size; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.util.Pair; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051@InterfaceAudience.Private 052public class CacheAwareLoadBalancer extends StochasticLoadBalancer { 053 private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class); 054 055 private Configuration configuration; 056 057 public enum GeneratorFunctionType { 058 LOAD, 059 CACHE_RATIO 060 } 061 062 @Override 063 public synchronized void loadConf(Configuration configuration) { 064 this.configuration = configuration; 065 this.costFunctions = new ArrayList<>(); 066 super.loadConf(configuration); 067 } 068 069 @Override 070 protected List<CandidateGenerator> createCandidateGenerators() { 071 List<CandidateGenerator> candidateGenerators = new ArrayList<>(2); 072 candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(), 073 new CacheAwareSkewnessCandidateGenerator()); 074 candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(), 075 new CacheAwareCandidateGenerator()); 076 return candidateGenerators; 077 } 078 079 @Override 080 protected List<CostFunction> createCostFunctions(Configuration configuration) { 081 List<CostFunction> costFunctions = new ArrayList<>(); 082 addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration)); 083 addCostFunction(costFunctions, new CacheAwareCostFunction(configuration)); 084 return costFunctions; 085 } 086 087 private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) { 088 if (costFunction.getMultiplier() > 0) { 089 costFunctions.add(costFunction); 090 } 091 } 092 093 @Override 094 public void updateClusterMetrics(ClusterMetrics clusterMetrics) { 095 this.clusterStatus = clusterMetrics; 096 updateRegionLoad(); 097 } 098 099 /** 100 * Collect the amount of region cached for all the regions from all the active region servers. 101 */ 102 private void updateRegionLoad() { 103 loads = new HashMap<>(); 104 regionCacheRatioOnOldServerMap = new HashMap<>(); 105 Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>(); 106 107 // Build current region cache statistics 108 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 109 // Create a map of region and the server where it is currently hosted 110 sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> { 111 String regionEncodedName = RegionInfo.encodeRegionName(regionName); 112 113 Deque<BalancerRegionLoad> rload = new ArrayDeque<>(); 114 115 // Get the total size of the hFiles in this region 116 int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE); 117 118 rload.add(new BalancerRegionLoad(rm)); 119 // Maintain a map of region and it's total size. This is needed to calculate the cache 120 // ratios for the regions cached on old region servers 121 regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB)); 122 loads.put(regionEncodedName, rload); 123 }); 124 }); 125 126 // Build cache statistics for the regions hosted previously on old region servers 127 clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> { 128 // Find if a region was previously hosted on a server other than the one it is currently 129 // hosted on. 130 sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> { 131 // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on 132 // this server 133 if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) { 134 ServerName currentServer = 135 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst(); 136 if (!ServerName.isSameAddress(currentServer, sn)) { 137 int regionSizeMB = 138 regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond(); 139 float regionCacheRatioOnOldServer = 140 regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB; 141 regionCacheRatioOnOldServerMap.put(regionEncodedName, 142 new Pair<>(sn, regionCacheRatioOnOldServer)); 143 } 144 } 145 }); 146 }); 147 } 148 149 private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) { 150 Optional<RegionInfo> regionInfoOptional = 151 Arrays.stream(cluster.regions).filter((RegionInfo ri) -> { 152 return regionName.equals(ri.getEncodedName()); 153 }).findFirst(); 154 155 if (regionInfoOptional.isPresent()) { 156 return regionInfoOptional.get(); 157 } 158 return null; 159 } 160 161 private class CacheAwareCandidateGenerator extends CandidateGenerator { 162 @Override 163 protected BalanceAction generate(BalancerClusterState cluster) { 164 // Move the regions to the servers they were previously hosted on based on the cache ratio 165 if ( 166 !regionCacheRatioOnOldServerMap.isEmpty() 167 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 168 ) { 169 Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap = 170 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 171 // Get the server where this region was previously hosted 172 String regionEncodedName = regionCacheRatioServerMap.getKey(); 173 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 174 if (regionInfo == null) { 175 LOG.warn("Region {} not found", regionEncodedName); 176 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 177 return BalanceAction.NULL_ACTION; 178 } 179 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 180 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 181 return BalanceAction.NULL_ACTION; 182 } 183 int regionIndex = cluster.regionsToIndex.get(regionInfo); 184 int oldServerIndex = cluster.serversToIndex 185 .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress()); 186 if (oldServerIndex < 0) { 187 LOG.warn("Server previously hosting region {} not found", regionEncodedName); 188 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 189 return BalanceAction.NULL_ACTION; 190 } 191 192 float oldRegionCacheRatio = 193 cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex); 194 int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex]; 195 float currentRegionCacheRatio = 196 cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex); 197 198 BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex, 199 currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio); 200 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 201 return action; 202 } 203 return BalanceAction.NULL_ACTION; 204 } 205 206 private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex, 207 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 208 float cacheRatioOnOldServer) { 209 return moveRegionToOldServer(cluster, regionIndex, currentServerIndex, 210 cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer) 211 ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1) 212 : BalanceAction.NULL_ACTION; 213 } 214 215 private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex, 216 int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex, 217 float cacheRatioOnOldServer) { 218 // Find if the region has already moved by comparing the current server index with the 219 // current server index. This can happen when other candidate generator has moved the region 220 if (currentServerIndex < 0 || oldServerIndex < 0) { 221 return false; 222 } 223 224 float cacheRatioDiffThreshold = 0.6f; 225 226 // Conditions for moving the region 227 228 // If the region is fully cached on the old server, move the region back 229 if (cacheRatioOnOldServer == 1.0f) { 230 if (LOG.isDebugEnabled()) { 231 LOG.debug("Region {} moved to the old server {} as it is fully cached there", 232 cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]); 233 } 234 return true; 235 } 236 237 // Move the region back to the old server if it is cached equally on both the servers 238 if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) { 239 if (LOG.isDebugEnabled()) { 240 LOG.debug( 241 "Region {} moved from {} to {} as the region is cached {} equally on both servers", 242 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 243 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer); 244 } 245 return true; 246 } 247 248 // If the region is not fully cached on either of the servers, move the region back to the 249 // old server if the region cache ratio on the current server is still much less than the old 250 // server 251 if ( 252 cacheRatioOnOldServer > 0.0f 253 && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold 254 ) { 255 if (LOG.isDebugEnabled()) { 256 LOG.debug( 257 "Region {} moved from {} to {} as region cache ratio {} is better than the current " 258 + "cache ratio {}", 259 cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex], 260 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); 261 } 262 return true; 263 } 264 265 if (LOG.isDebugEnabled()) { 266 LOG.debug( 267 "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}", 268 cluster.regions[regionIndex], cluster.servers[currentServerIndex], 269 cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer); 270 } 271 return false; 272 } 273 } 274 275 private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator { 276 @Override 277 BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { 278 // First move all the regions which were hosted previously on some other server back to their 279 // old servers 280 if ( 281 !regionCacheRatioOnOldServerMap.isEmpty() 282 && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext() 283 ) { 284 // Get the first region index in the historical cache ratio list 285 Map.Entry<String, Pair<ServerName, Float>> regionEntry = 286 regionCacheRatioOnOldServerMap.entrySet().iterator().next(); 287 String regionEncodedName = regionEntry.getKey(); 288 289 RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName); 290 if (regionInfo == null) { 291 LOG.warn("Region {} does not exist", regionEncodedName); 292 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 293 return BalanceAction.NULL_ACTION; 294 } 295 if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) { 296 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 297 return BalanceAction.NULL_ACTION; 298 } 299 300 int regionIndex = cluster.regionsToIndex.get(regionInfo); 301 302 // Get the current host name for this region 303 thisServer = cluster.regionIndexToServerIndex[regionIndex]; 304 305 // Get the old server index 306 otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress()); 307 308 regionCacheRatioOnOldServerMap.remove(regionEncodedName); 309 310 if (otherServer < 0) { 311 // The old server has been moved to other host and hence, the region cannot be moved back 312 // to the old server 313 if (LOG.isDebugEnabled()) { 314 LOG.debug( 315 "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old " 316 + "server {} as the server does not exist", 317 regionEncodedName, regionEntry.getValue().getFirst().getHostname()); 318 } 319 return BalanceAction.NULL_ACTION; 320 } 321 322 if (LOG.isDebugEnabled()) { 323 LOG.debug( 324 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it " 325 + "was hosted their earlier", 326 regionEncodedName, cluster.servers[thisServer].getHostname(), 327 cluster.servers[otherServer].getHostname()); 328 } 329 330 return getAction(thisServer, regionIndex, otherServer, -1); 331 } 332 333 if (thisServer < 0 || otherServer < 0) { 334 return BalanceAction.NULL_ACTION; 335 } 336 337 int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer); 338 if (regionIndexToMove < 0) { 339 if (LOG.isDebugEnabled()) { 340 LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement"); 341 } 342 return BalanceAction.NULL_ACTION; 343 } 344 if (LOG.isDebugEnabled()) { 345 LOG.debug( 346 "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is " 347 + "least cached on current server", 348 cluster.regions[regionIndexToMove].getEncodedName(), 349 cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname()); 350 } 351 return getAction(thisServer, regionIndexToMove, otherServer, -1); 352 } 353 354 private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) { 355 float minCacheRatio = Float.MAX_VALUE; 356 int leastCachedRegion = -1; 357 for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) { 358 int regionIndex = cluster.regionsPerServer[thisServer][i]; 359 360 float cacheRatioOnCurrentServer = 361 cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer); 362 if (cacheRatioOnCurrentServer < minCacheRatio) { 363 minCacheRatio = cacheRatioOnCurrentServer; 364 leastCachedRegion = regionIndex; 365 } 366 } 367 return leastCachedRegion; 368 } 369 } 370 371 static class CacheAwareRegionSkewnessCostFunction extends CostFunction { 372 static final String REGION_COUNT_SKEW_COST_KEY = 373 "hbase.master.balancer.stochastic.regionCountCost"; 374 static final float DEFAULT_REGION_COUNT_SKEW_COST = 20; 375 private final DoubleArrayCost cost = new DoubleArrayCost(); 376 377 CacheAwareRegionSkewnessCostFunction(Configuration conf) { 378 // Load multiplier should be the greatest as it is the most general way to balance data. 379 this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST)); 380 } 381 382 @Override 383 void prepare(BalancerClusterState cluster) { 384 super.prepare(cluster); 385 cost.prepare(cluster.numServers); 386 cost.applyCostsChange(costs -> { 387 for (int i = 0; i < cluster.numServers; i++) { 388 costs[i] = cluster.regionsPerServer[i].length; 389 } 390 }); 391 } 392 393 @Override 394 protected double cost() { 395 return cost.cost(); 396 } 397 398 @Override 399 protected void regionMoved(int region, int oldServer, int newServer) { 400 cost.applyCostsChange(costs -> { 401 costs[oldServer] = cluster.regionsPerServer[oldServer].length; 402 costs[newServer] = cluster.regionsPerServer[newServer].length; 403 }); 404 } 405 406 public final void updateWeight(double[] weights) { 407 weights[GeneratorFunctionType.LOAD.ordinal()] += cost(); 408 } 409 } 410 411 static class CacheAwareCostFunction extends CostFunction { 412 private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost"; 413 private double cacheRatio; 414 private double bestCacheRatio; 415 416 private static final float DEFAULT_CACHE_COST = 20; 417 418 CacheAwareCostFunction(Configuration conf) { 419 boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null; 420 // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled 421 this.setMultiplier( 422 !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST)); 423 bestCacheRatio = 0.0; 424 cacheRatio = 0.0; 425 } 426 427 @Override 428 void prepare(BalancerClusterState cluster) { 429 super.prepare(cluster); 430 cacheRatio = 0.0; 431 bestCacheRatio = 0.0; 432 433 for (int region = 0; region < cluster.numRegions; region++) { 434 cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, 435 cluster.regionIndexToServerIndex[region]); 436 bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region, 437 getServerWithBestCacheRatioForRegion(region)); 438 } 439 440 cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio; 441 if (LOG.isDebugEnabled()) { 442 LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio); 443 } 444 } 445 446 @Override 447 protected double cost() { 448 return scale(0, 1, 1 - cacheRatio); 449 } 450 451 @Override 452 protected void regionMoved(int region, int oldServer, int newServer) { 453 double regionCacheRatioOnOldServer = 454 cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer); 455 double regionCacheRatioOnNewServer = 456 cluster.getOrComputeWeightedRegionCacheRatio(region, newServer); 457 double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer; 458 double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio; 459 cacheRatio += normalizedDelta; 460 if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) { 461 LOG.debug( 462 "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:" 463 + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}", 464 cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(), 465 cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer, 466 regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio); 467 } 468 } 469 470 private int getServerWithBestCacheRatioForRegion(int region) { 471 return cluster.getOrComputeServerWithBestRegionCachedRatio()[region]; 472 } 473 474 @Override 475 public final void updateWeight(double[] weights) { 476 weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost(); 477 } 478 } 479}