001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import org.agrona.collections.Hashing; 030import org.agrona.collections.Int2IntCounterMap; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionReplicaUtil; 035import org.apache.hadoop.hbase.master.RackManager; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * An efficient array based implementation similar to ClusterState for keeping the status of the 044 * cluster in terms of region assignment and distribution. LoadBalancers, such as 045 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 046 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 047 * <p/> 048 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 049 * topology in terms of server names, hostnames and racks. 050 */ 051@InterfaceAudience.Private 052class BalancerClusterState { 053 054 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 055 056 ServerName[] servers; 057 // ServerName uniquely identifies a region server. multiple RS can run on the same host 058 String[] hosts; 059 String[] racks; 060 boolean multiServersPerHost = false; // whether or not any host has more than one server 061 062 ArrayList<String> tables; 063 RegionInfo[] regions; 064 Deque<BalancerRegionLoad>[] regionLoads; 065 private RegionHDFSBlockLocationFinder regionFinder; 066 067 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 068 069 int[] serverIndexToHostIndex; // serverIndex -> host index 070 int[] serverIndexToRackIndex; // serverIndex -> rack index 071 072 int[][] regionsPerServer; // serverIndex -> region list 073 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 074 int[][] regionsPerHost; // hostIndex -> list of regions 075 int[][] regionsPerRack; // rackIndex -> region list 076 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 077 // replicas by primary region index 078 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 079 // primary region index 080 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 081 // primary region index 082 083 int[][] serversPerHost; // hostIndex -> list of server indexes 084 int[][] serversPerRack; // rackIndex -> list of server indexes 085 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 086 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 087 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 088 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions 089 int[] numRegionsPerTable; // tableIndex -> region count 090 double[] meanRegionsPerTable; // mean region count per table 091 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 092 boolean hasRegionReplicas = false; // whether there is regions with replicas 093 094 Integer[] serverIndicesSortedByRegionCount; 095 Integer[] serverIndicesSortedByLocality; 096 097 Map<Address, Integer> serversToIndex; 098 Map<String, Integer> hostsToIndex; 099 Map<String, Integer> racksToIndex; 100 Map<String, Integer> tablesToIndex; 101 Map<RegionInfo, Integer> regionsToIndex; 102 float[] localityPerServer; 103 104 int numServers; 105 int numHosts; 106 int numRacks; 107 int numTables; 108 int numRegions; 109 int maxReplicas = 1; 110 111 int numMovedRegions = 0; // num moved regions from the initial configuration 112 Map<ServerName, List<RegionInfo>> clusterState; 113 114 private final RackManager rackManager; 115 // Maps region -> rackIndex -> locality of region on rack 116 private float[][] rackLocalities; 117 // Maps localityType -> region -> [server|rack]Index with highest locality 118 private int[][] regionsToMostLocalEntities; 119 // Maps region -> serverIndex -> regionCacheRatio of a region on a server 120 private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; 121 // Maps regionIndex -> serverIndex with best region cache ratio 122 private int[] regionServerIndexWithBestRegionCachedRatio; 123 // Maps regionName -> oldServerName -> cache ratio of the region on the old server 124 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; 125 126 static class DefaultRackManager extends RackManager { 127 @Override 128 public String getRack(ServerName server) { 129 return UNKNOWN_RACK; 130 } 131 } 132 133 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 134 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 135 RackManager rackManager) { 136 this(null, clusterState, loads, regionFinder, rackManager, null); 137 } 138 139 protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 140 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder, 141 RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 142 this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); 143 } 144 145 @SuppressWarnings("unchecked") 146 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 147 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 148 RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager, 149 Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 150 if (unassignedRegions == null) { 151 unassignedRegions = Collections.emptyList(); 152 } 153 154 serversToIndex = new HashMap<>(); 155 hostsToIndex = new HashMap<>(); 156 racksToIndex = new HashMap<>(); 157 tablesToIndex = new HashMap<>(); 158 159 // TODO: We should get the list of tables from master 160 tables = new ArrayList<>(); 161 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 162 163 this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; 164 165 numRegions = 0; 166 167 List<List<Integer>> serversPerHostList = new ArrayList<>(); 168 List<List<Integer>> serversPerRackList = new ArrayList<>(); 169 this.clusterState = clusterState; 170 this.regionFinder = regionFinder; 171 172 // Use servername and port as there can be dead servers in this list. We want everything with 173 // a matching hostname and port to have the same index. 174 for (ServerName sn : clusterState.keySet()) { 175 if (sn == null) { 176 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 177 + "skipping; unassigned regions?"); 178 if (LOG.isTraceEnabled()) { 179 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 180 } 181 continue; 182 } 183 if (serversToIndex.get(sn.getAddress()) == null) { 184 serversToIndex.put(sn.getAddress(), numServers++); 185 } 186 if (!hostsToIndex.containsKey(sn.getHostname())) { 187 hostsToIndex.put(sn.getHostname(), numHosts++); 188 serversPerHostList.add(new ArrayList<>(1)); 189 } 190 191 int serverIndex = serversToIndex.get(sn.getAddress()); 192 int hostIndex = hostsToIndex.get(sn.getHostname()); 193 serversPerHostList.get(hostIndex).add(serverIndex); 194 195 String rack = this.rackManager.getRack(sn); 196 197 if (!racksToIndex.containsKey(rack)) { 198 racksToIndex.put(rack, numRacks++); 199 serversPerRackList.add(new ArrayList<>()); 200 } 201 int rackIndex = racksToIndex.get(rack); 202 serversPerRackList.get(rackIndex).add(serverIndex); 203 } 204 205 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 206 // Count how many regions there are. 207 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 208 numRegions += entry.getValue().size(); 209 } 210 numRegions += unassignedRegions.size(); 211 212 regionsToIndex = new HashMap<>(numRegions); 213 servers = new ServerName[numServers]; 214 serversPerHost = new int[numHosts][]; 215 serversPerRack = new int[numRacks][]; 216 regions = new RegionInfo[numRegions]; 217 regionIndexToServerIndex = new int[numRegions]; 218 initialRegionIndexToServerIndex = new int[numRegions]; 219 regionIndexToTableIndex = new int[numRegions]; 220 regionIndexToPrimaryIndex = new int[numRegions]; 221 regionLoads = new Deque[numRegions]; 222 223 regionLocations = new int[numRegions][]; 224 serverIndicesSortedByRegionCount = new Integer[numServers]; 225 serverIndicesSortedByLocality = new Integer[numServers]; 226 localityPerServer = new float[numServers]; 227 228 serverIndexToHostIndex = new int[numServers]; 229 serverIndexToRackIndex = new int[numServers]; 230 regionsPerServer = new int[numServers][]; 231 serverIndexToRegionsOffset = new int[numServers]; 232 regionsPerHost = new int[numHosts][]; 233 regionsPerRack = new int[numRacks][]; 234 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 235 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 236 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 237 238 int regionIndex = 0, regionPerServerIndex = 0; 239 240 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 241 if (entry.getKey() == null) { 242 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 243 continue; 244 } 245 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 246 247 // keep the servername if this is the first server name for this hostname 248 // or this servername has the newest startcode. 249 if ( 250 servers[serverIndex] == null 251 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 252 ) { 253 servers[serverIndex] = entry.getKey(); 254 } 255 256 if (regionsPerServer[serverIndex] != null) { 257 // there is another server with the same hostAndPort in ClusterState. 258 // allocate the array for the total size 259 regionsPerServer[serverIndex] = 260 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 261 } else { 262 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 263 } 264 colocatedReplicaCountsPerServer[serverIndex] = 265 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 266 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 267 serverIndicesSortedByLocality[serverIndex] = serverIndex; 268 } 269 270 hosts = new String[numHosts]; 271 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 272 hosts[entry.getValue()] = entry.getKey(); 273 } 274 racks = new String[numRacks]; 275 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 276 racks[entry.getValue()] = entry.getKey(); 277 } 278 279 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 280 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 281 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 282 283 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 284 serverIndexToHostIndex[serverIndex] = hostIndex; 285 286 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 287 serverIndexToRackIndex[serverIndex] = rackIndex; 288 289 for (RegionInfo region : entry.getValue()) { 290 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 291 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 292 regionIndex++; 293 } 294 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 295 } 296 297 for (RegionInfo region : unassignedRegions) { 298 registerRegion(region, regionIndex, -1, loads, regionFinder); 299 regionIndex++; 300 } 301 302 if (LOG.isDebugEnabled()) { 303 for (int i = 0; i < numServers; i++) { 304 LOG.debug("server {} has {} regions", i, regionsPerServer[i].length); 305 } 306 } 307 for (int i = 0; i < serversPerHostList.size(); i++) { 308 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 309 for (int j = 0; j < serversPerHost[i].length; j++) { 310 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 311 LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i); 312 } 313 if (serversPerHost[i].length > 1) { 314 multiServersPerHost = true; 315 } 316 } 317 318 for (int i = 0; i < serversPerRackList.size(); i++) { 319 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 320 for (int j = 0; j < serversPerRack[i].length; j++) { 321 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 322 LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 323 } 324 } 325 326 numTables = tables.size(); 327 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 328 numRacks); 329 numRegionsPerServerPerTable = new int[numTables][numServers]; 330 numRegionsPerTable = new int[numTables]; 331 332 for (int i = 0; i < numTables; i++) { 333 for (int j = 0; j < numServers; j++) { 334 numRegionsPerServerPerTable[i][j] = 0; 335 } 336 } 337 338 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 339 if (regionIndexToServerIndex[i] >= 0) { 340 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 341 numRegionsPerTable[regionIndexToTableIndex[i]]++; 342 } 343 } 344 345 // Avoid repeated computation for planning 346 meanRegionsPerTable = new double[numTables]; 347 348 for (int i = 0; i < numTables; i++) { 349 meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; 350 } 351 352 for (int i = 0; i < regions.length; i++) { 353 RegionInfo info = regions[i]; 354 if (RegionReplicaUtil.isDefaultReplica(info)) { 355 regionIndexToPrimaryIndex[i] = i; 356 } else { 357 hasRegionReplicas = true; 358 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 359 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 360 } 361 } 362 363 for (int i = 0; i < regionsPerServer.length; i++) { 364 colocatedReplicaCountsPerServer[i] = 365 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 366 for (int j = 0; j < regionsPerServer[i].length; j++) { 367 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 368 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 369 } 370 } 371 // compute regionsPerHost 372 if (multiServersPerHost) { 373 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 374 serversPerHost); 375 } 376 377 // compute regionsPerRack 378 if (numRacks > 1) { 379 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 380 serversPerRack); 381 } 382 } 383 384 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 385 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 386 for (int i = 0; i < serversPerLocation.length; i++) { 387 int numRegionsPerLocation = 0; 388 for (int j = 0; j < serversPerLocation[i].length; j++) { 389 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 390 } 391 regionsPerLocation[i] = new int[numRegionsPerLocation]; 392 colocatedReplicaCountsPerLocation[i] = 393 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 394 } 395 396 for (int i = 0; i < serversPerLocation.length; i++) { 397 int numRegionPerLocationIndex = 0; 398 for (int j = 0; j < serversPerLocation[i].length; j++) { 399 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 400 int region = regionsPerServer[serversPerLocation[i][j]][k]; 401 regionsPerLocation[i][numRegionPerLocationIndex] = region; 402 int primaryIndex = regionIndexToPrimaryIndex[region]; 403 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 404 numRegionPerLocationIndex++; 405 } 406 } 407 } 408 409 } 410 411 /** Helper for Cluster constructor to handle a region */ 412 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 413 Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) { 414 String tableName = region.getTable().getNameAsString(); 415 if (!tablesToIndex.containsKey(tableName)) { 416 tables.add(tableName); 417 tablesToIndex.put(tableName, tablesToIndex.size()); 418 } 419 int tableIndex = tablesToIndex.get(tableName); 420 421 regionsToIndex.put(region, regionIndex); 422 regions[regionIndex] = region; 423 regionIndexToServerIndex[regionIndex] = serverIndex; 424 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 425 regionIndexToTableIndex[regionIndex] = tableIndex; 426 427 // region load 428 if (loads != null) { 429 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 430 // That could have failed if the RegionLoad is using the other regionName 431 if (rl == null) { 432 // Try getting the region load using encoded name. 433 rl = loads.get(region.getEncodedName()); 434 } 435 regionLoads[regionIndex] = rl; 436 } 437 438 if (regionFinder != null) { 439 // region location 440 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 441 regionLocations[regionIndex] = new int[loc.size()]; 442 for (int i = 0; i < loc.size(); i++) { 443 regionLocations[regionIndex][i] = loc.get(i) == null 444 ? -1 445 : (serversToIndex.get(loc.get(i).getAddress()) == null 446 ? -1 447 : serversToIndex.get(loc.get(i).getAddress())); 448 } 449 } 450 451 int numReplicas = region.getReplicaId() + 1; 452 if (numReplicas > maxReplicas) { 453 maxReplicas = numReplicas; 454 } 455 } 456 457 /** 458 * Returns true iff a given server has less regions than the balanced amount 459 */ 460 public boolean serverHasTooFewRegions(int server) { 461 int minLoad = this.numRegions / numServers; 462 int numRegions = getNumRegions(server); 463 return numRegions < minLoad; 464 } 465 466 /** 467 * Retrieves and lazily initializes a field storing the locality of every region/server 468 * combination 469 */ 470 public float[][] getOrComputeRackLocalities() { 471 if (rackLocalities == null || regionsToMostLocalEntities == null) { 472 computeCachedLocalities(); 473 } 474 return rackLocalities; 475 } 476 477 /** 478 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 479 * the locality 480 */ 481 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 482 if (rackLocalities == null || regionsToMostLocalEntities == null) { 483 computeCachedLocalities(); 484 } 485 return regionsToMostLocalEntities[type.ordinal()]; 486 } 487 488 /** 489 * Looks up locality from cache of localities. Will create cache if it does not already exist. 490 */ 491 public float getOrComputeLocality(int region, int entity, 492 BalancerClusterState.LocalityType type) { 493 switch (type) { 494 case SERVER: 495 return getLocalityOfRegion(region, entity); 496 case RACK: 497 return getOrComputeRackLocalities()[region][entity]; 498 default: 499 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 500 } 501 } 502 503 /** 504 * Returns locality weighted by region size in MB. Will create locality cache if it does not 505 * already exist. 506 */ 507 public double getOrComputeWeightedLocality(int region, int server, 508 BalancerClusterState.LocalityType type) { 509 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 510 } 511 512 /** 513 * Returns the size in MB from the most recent RegionLoad for region 514 */ 515 public int getRegionSizeMB(int region) { 516 Deque<BalancerRegionLoad> load = regionLoads[region]; 517 // This means regions have no actual data on disk 518 if (load == null) { 519 return 0; 520 } 521 return regionLoads[region].getLast().getStorefileSizeMB(); 522 } 523 524 /** 525 * Computes and caches the locality for each region/rack combinations, as well as storing a 526 * mapping of region -> server and region -> rack such that server and rack have the highest 527 * locality for region 528 */ 529 private void computeCachedLocalities() { 530 rackLocalities = new float[numRegions][numRacks]; 531 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 532 533 // Compute localities and find most local server per region 534 for (int region = 0; region < numRegions; region++) { 535 int serverWithBestLocality = 0; 536 float bestLocalityForRegion = 0; 537 for (int server = 0; server < numServers; server++) { 538 // Aggregate per-rack locality 539 float locality = getLocalityOfRegion(region, server); 540 int rack = serverIndexToRackIndex[server]; 541 int numServersInRack = serversPerRack[rack].length; 542 rackLocalities[region][rack] += locality / numServersInRack; 543 544 if (locality > bestLocalityForRegion) { 545 serverWithBestLocality = server; 546 bestLocalityForRegion = locality; 547 } 548 } 549 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 550 551 // Find most local rack per region 552 int rackWithBestLocality = 0; 553 float bestRackLocalityForRegion = 0.0f; 554 for (int rack = 0; rack < numRacks; rack++) { 555 float rackLocality = rackLocalities[region][rack]; 556 if (rackLocality > bestRackLocalityForRegion) { 557 bestRackLocalityForRegion = rackLocality; 558 rackWithBestLocality = rack; 559 } 560 } 561 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 562 } 563 564 } 565 566 /** 567 * Returns the size of hFiles from the most recent RegionLoad for region 568 */ 569 public int getTotalRegionHFileSizeMB(int region) { 570 Deque<BalancerRegionLoad> load = regionLoads[region]; 571 if (load == null) { 572 // This means, that the region has no actual data on disk 573 return 0; 574 } 575 return regionLoads[region].getLast().getRegionSizeMB(); 576 } 577 578 /** 579 * Returns the weighted cache ratio of a region on the given region server 580 */ 581 public float getOrComputeWeightedRegionCacheRatio(int region, int server) { 582 return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); 583 } 584 585 /** 586 * Returns the amount by which a region is cached on a given region server. If the region is not 587 * currently hosted on the given region server, then find out if it was previously hosted there 588 * and return the old cache ratio. 589 */ 590 protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { 591 float regionCacheRatio = 0.0f; 592 593 // Get the current region cache ratio if the region is hosted on the server regionServerIndex 594 for (int regionIndex : regionsPerServer[regionServerIndex]) { 595 if (region != regionIndex) { 596 continue; 597 } 598 599 Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; 600 601 // The region is currently hosted on this region server. Get the region cache ratio for this 602 // region on this server 603 regionCacheRatio = 604 regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); 605 606 return regionCacheRatio; 607 } 608 609 // Region is not currently hosted on this server. Check if the region was cached on this 610 // server earlier. This can happen when the server was shutdown and the cache was persisted. 611 // Search using the region name and server name and not the index id and server id as these ids 612 // may change when a server is marked as dead or a new server is added. 613 String regionEncodedName = regions[region].getEncodedName(); 614 ServerName serverName = servers[regionServerIndex]; 615 if ( 616 regionCacheRatioOnOldServerMap != null 617 && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) 618 ) { 619 Pair<ServerName, Float> cacheRatioOfRegionOnServer = 620 regionCacheRatioOnOldServerMap.get(regionEncodedName); 621 if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { 622 regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); 623 if (LOG.isDebugEnabled()) { 624 LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, 625 serverName, regionCacheRatio); 626 } 627 } 628 } 629 return regionCacheRatio; 630 } 631 632 /** 633 * Populate the maps containing information about how much a region is cached on a region server. 634 */ 635 private void computeRegionServerRegionCacheRatio() { 636 regionIndexServerIndexRegionCachedRatio = new HashMap<>(); 637 regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; 638 639 for (int region = 0; region < numRegions; region++) { 640 float bestRegionCacheRatio = 0.0f; 641 int serverWithBestRegionCacheRatio = 0; 642 for (int server = 0; server < numServers; server++) { 643 float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); 644 if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { 645 // A region with cache ratio 0 on a server means nothing. Hence, just make a note of 646 // cache ratio only if the cache ratio is greater than 0. 647 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 648 regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); 649 } 650 if (regionCacheRatio > bestRegionCacheRatio) { 651 serverWithBestRegionCacheRatio = server; 652 // If the server currently hosting the region has equal cache ratio to a historical 653 // server, consider the current server to keep hosting the region 654 bestRegionCacheRatio = regionCacheRatio; 655 } else if ( 656 regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] 657 ) { 658 // If two servers have same region cache ratio, then the server currently hosting the 659 // region 660 // should retain the region 661 serverWithBestRegionCacheRatio = server; 662 } 663 } 664 regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; 665 Pair<Integer, Integer> regionServerPair = 666 new Pair<>(region, regionIndexToServerIndex[region]); 667 float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); 668 if (tempRegionCacheRatio > bestRegionCacheRatio) { 669 LOG.warn( 670 "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " 671 + "best region cache ratio {} on server {}", 672 regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], 673 tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); 674 } 675 } 676 } 677 678 protected float getOrComputeRegionCacheRatio(int region, int server) { 679 if ( 680 regionServerIndexWithBestRegionCachedRatio == null 681 || regionIndexServerIndexRegionCachedRatio.isEmpty() 682 ) { 683 computeRegionServerRegionCacheRatio(); 684 } 685 686 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 687 return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) 688 ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) 689 : 0.0f; 690 } 691 692 public int[] getOrComputeServerWithBestRegionCachedRatio() { 693 if ( 694 regionServerIndexWithBestRegionCachedRatio == null 695 || regionIndexServerIndexRegionCachedRatio.isEmpty() 696 ) { 697 computeRegionServerRegionCacheRatio(); 698 } 699 return regionServerIndexWithBestRegionCachedRatio; 700 } 701 702 /** 703 * Maps region index to rack index 704 */ 705 public int getRackForRegion(int region) { 706 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 707 } 708 709 enum LocalityType { 710 SERVER, 711 RACK 712 } 713 714 public void doAction(BalanceAction action) { 715 switch (action.getType()) { 716 case NULL: 717 break; 718 case ASSIGN_REGION: 719 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 720 assert action instanceof AssignRegionAction : action.getClass(); 721 AssignRegionAction ar = (AssignRegionAction) action; 722 regionsPerServer[ar.getServer()] = 723 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 724 regionMoved(ar.getRegion(), -1, ar.getServer()); 725 break; 726 case MOVE_REGION: 727 assert action instanceof MoveRegionAction : action.getClass(); 728 MoveRegionAction mra = (MoveRegionAction) action; 729 regionsPerServer[mra.getFromServer()] = 730 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 731 regionsPerServer[mra.getToServer()] = 732 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 733 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 734 break; 735 case SWAP_REGIONS: 736 assert action instanceof SwapRegionsAction : action.getClass(); 737 SwapRegionsAction a = (SwapRegionsAction) action; 738 regionsPerServer[a.getFromServer()] = 739 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 740 regionsPerServer[a.getToServer()] = 741 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 742 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 743 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 744 break; 745 default: 746 throw new RuntimeException("Uknown action:" + action.getType()); 747 } 748 } 749 750 /** 751 * Return true if the placement of region on server would lower the availability of the region in 752 * question 753 * @return true or false 754 */ 755 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 756 if (!serversToIndex.containsKey(serverName.getAddress())) { 757 return false; // safeguard against race between cluster.servers and servers from LB method 758 // args 759 } 760 int server = serversToIndex.get(serverName.getAddress()); 761 int region = regionsToIndex.get(regionInfo); 762 763 // Region replicas for same region should better assign to different servers 764 for (int i : regionsPerServer[server]) { 765 RegionInfo otherRegionInfo = regions[i]; 766 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 767 return true; 768 } 769 } 770 771 int primary = regionIndexToPrimaryIndex[region]; 772 if (primary == -1) { 773 return false; 774 } 775 // there is a subset relation for server < host < rack 776 // check server first 777 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 778 if (result != 0) { 779 return result > 0; 780 } 781 782 // check host 783 if (multiServersPerHost) { 784 result = checkLocationForPrimary(serverIndexToHostIndex[server], 785 colocatedReplicaCountsPerHost, primary); 786 if (result != 0) { 787 return result > 0; 788 } 789 } 790 791 // check rack 792 if (numRacks > 1) { 793 result = checkLocationForPrimary(serverIndexToRackIndex[server], 794 colocatedReplicaCountsPerRack, primary); 795 if (result != 0) { 796 return result > 0; 797 } 798 } 799 return false; 800 } 801 802 /** 803 * Common method for better solution check. 804 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 805 * colocatedReplicaCountsPerRack 806 * @return 1 for better, -1 for no better, 0 for unknown 807 */ 808 private int checkLocationForPrimary(int location, 809 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 810 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 811 // check for whether there are other Locations that we can place this region 812 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 813 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 814 return 1; // meaning there is a better Location 815 } 816 } 817 return -1; // there is not a better Location to place this 818 } 819 return 0; 820 } 821 822 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 823 if (!serversToIndex.containsKey(serverName.getAddress())) { 824 return; 825 } 826 int server = serversToIndex.get(serverName.getAddress()); 827 int region = regionsToIndex.get(regionInfo); 828 doAction(new AssignRegionAction(region, server)); 829 } 830 831 void regionMoved(int region, int oldServer, int newServer) { 832 regionIndexToServerIndex[region] = newServer; 833 if (initialRegionIndexToServerIndex[region] == newServer) { 834 numMovedRegions--; // region moved back to original location 835 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 836 numMovedRegions++; // region moved from original location 837 } 838 int tableIndex = regionIndexToTableIndex[region]; 839 if (oldServer >= 0) { 840 numRegionsPerServerPerTable[tableIndex][oldServer]--; 841 } 842 numRegionsPerServerPerTable[tableIndex][newServer]++; 843 844 // update for servers 845 int primary = regionIndexToPrimaryIndex[region]; 846 if (oldServer >= 0) { 847 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 848 } 849 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 850 851 // update for hosts 852 if (multiServersPerHost) { 853 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 854 oldServer, newServer, primary, region); 855 } 856 857 // update for racks 858 if (numRacks > 1) { 859 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 860 oldServer, newServer, primary, region); 861 } 862 } 863 864 /** 865 * Common method for per host and per Location region index updates when a region is moved. 866 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 867 * @param regionsPerLocation regionsPerHost or regionsPerLocation 868 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 869 * colocatedReplicaCountsPerRack 870 */ 871 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 872 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 873 int primary, int region) { 874 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 875 int newLocation = serverIndexToLocation[newServer]; 876 if (newLocation != oldLocation) { 877 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 878 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 879 if (oldLocation >= 0) { 880 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 881 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 882 } 883 } 884 885 } 886 887 int[] removeRegion(int[] regions, int regionIndex) { 888 // TODO: this maybe costly. Consider using linked lists 889 int[] newRegions = new int[regions.length - 1]; 890 int i = 0; 891 for (i = 0; i < regions.length; i++) { 892 if (regions[i] == regionIndex) { 893 break; 894 } 895 newRegions[i] = regions[i]; 896 } 897 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 898 return newRegions; 899 } 900 901 int[] addRegion(int[] regions, int regionIndex) { 902 int[] newRegions = new int[regions.length + 1]; 903 System.arraycopy(regions, 0, newRegions, 0, regions.length); 904 newRegions[newRegions.length - 1] = regionIndex; 905 return newRegions; 906 } 907 908 int[] addRegionSorted(int[] regions, int regionIndex) { 909 int[] newRegions = new int[regions.length + 1]; 910 int i = 0; 911 for (i = 0; i < regions.length; i++) { // find the index to insert 912 if (regions[i] > regionIndex) { 913 break; 914 } 915 } 916 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 917 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 918 newRegions[i] = regionIndex; 919 920 return newRegions; 921 } 922 923 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 924 int i = 0; 925 for (i = 0; i < regions.length; i++) { 926 if (regions[i] == regionIndex) { 927 regions[i] = newRegionIndex; 928 break; 929 } 930 } 931 return regions; 932 } 933 934 void sortServersByRegionCount() { 935 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 936 } 937 938 int getNumRegions(int server) { 939 return regionsPerServer[server].length; 940 } 941 942 boolean contains(int[] arr, int val) { 943 return Arrays.binarySearch(arr, val) >= 0; 944 } 945 946 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 947 948 public Comparator<Integer> getNumRegionsComparator() { 949 return numRegionsComparator; 950 } 951 952 int getLowestLocalityRegionOnServer(int serverIndex) { 953 if (regionFinder != null) { 954 float lowestLocality = 1.0f; 955 int lowestLocalityRegionIndex = -1; 956 if (regionsPerServer[serverIndex].length == 0) { 957 // No regions on that region server 958 return -1; 959 } 960 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 961 int regionIndex = regionsPerServer[serverIndex][j]; 962 HDFSBlocksDistribution distribution = 963 regionFinder.getBlockDistribution(regions[regionIndex]); 964 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 965 // skip empty region 966 if (distribution.getUniqueBlocksTotalWeight() == 0) { 967 continue; 968 } 969 if (locality < lowestLocality) { 970 lowestLocality = locality; 971 lowestLocalityRegionIndex = j; 972 } 973 } 974 if (lowestLocalityRegionIndex == -1) { 975 return -1; 976 } 977 if (LOG.isTraceEnabled()) { 978 LOG.trace("Lowest locality region is " 979 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 980 .getRegionNameAsString() 981 + " with locality " + lowestLocality + " and its region server contains " 982 + regionsPerServer[serverIndex].length + " regions"); 983 } 984 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 985 } else { 986 return -1; 987 } 988 } 989 990 float getLocalityOfRegion(int region, int server) { 991 if (regionFinder != null) { 992 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 993 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 994 } else { 995 return 0f; 996 } 997 } 998 999 void setNumRegions(int numRegions) { 1000 this.numRegions = numRegions; 1001 } 1002 1003 void setNumMovedRegions(int numMovedRegions) { 1004 this.numMovedRegions = numMovedRegions; 1005 } 1006 1007 @Override 1008 public String toString() { 1009 StringBuilder desc = new StringBuilder("Cluster={servers=["); 1010 for (ServerName sn : servers) { 1011 desc.append(sn.getAddress().toString()).append(", "); 1012 } 1013 desc.append("], serverIndicesSortedByRegionCount=") 1014 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 1015 .append(Arrays.deepToString(regionsPerServer)); 1016 1017 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 1018 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 1019 .append('}'); 1020 return desc.toString(); 1021 } 1022}