001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.Deque; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import org.agrona.collections.Hashing; 030import org.agrona.collections.Int2IntCounterMap; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.ServerName; 033import org.apache.hadoop.hbase.client.RegionInfo; 034import org.apache.hadoop.hbase.client.RegionReplicaUtil; 035import org.apache.hadoop.hbase.master.RackManager; 036import org.apache.hadoop.hbase.net.Address; 037import org.apache.hadoop.hbase.util.Pair; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * An efficient array based implementation similar to ClusterState for keeping the status of the 044 * cluster in terms of region assignment and distribution. LoadBalancers, such as 045 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap 046 * manipulations are very costly, which is why this class uses mostly indexes and arrays. 047 * <p/> 048 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server 049 * topology in terms of server names, hostnames and racks. 050 */ 051@InterfaceAudience.Private 052class BalancerClusterState { 053 054 private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); 055 056 ServerName[] servers; 057 // ServerName uniquely identifies a region server. multiple RS can run on the same host 058 String[] hosts; 059 String[] racks; 060 boolean multiServersPerHost = false; // whether or not any host has more than one server 061 062 ArrayList<String> tables; 063 RegionInfo[] regions; 064 Deque<BalancerRegionLoad>[] regionLoads; 065 private RegionLocationFinder regionFinder; 066 067 int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality 068 069 int[] serverIndexToHostIndex; // serverIndex -> host index 070 int[] serverIndexToRackIndex; // serverIndex -> rack index 071 072 int[][] regionsPerServer; // serverIndex -> region list 073 int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list 074 int[][] regionsPerHost; // hostIndex -> list of regions 075 int[][] regionsPerRack; // rackIndex -> region list 076 Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated 077 // replicas by primary region index 078 Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by 079 // primary region index 080 Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by 081 // primary region index 082 083 int[][] serversPerHost; // hostIndex -> list of server indexes 084 int[][] serversPerRack; // rackIndex -> list of server indexes 085 int[] regionIndexToServerIndex; // regionIndex -> serverIndex 086 int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) 087 int[] regionIndexToTableIndex; // regionIndex -> tableIndex 088 int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> # regions 089 int[] numRegionsPerTable; // tableIndex -> region count 090 int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS 091 int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary 092 boolean hasRegionReplicas = false; // whether there is regions with replicas 093 094 Integer[] serverIndicesSortedByRegionCount; 095 Integer[] serverIndicesSortedByLocality; 096 097 Map<Address, Integer> serversToIndex; 098 Map<String, Integer> hostsToIndex; 099 Map<String, Integer> racksToIndex; 100 Map<String, Integer> tablesToIndex; 101 Map<RegionInfo, Integer> regionsToIndex; 102 float[] localityPerServer; 103 104 int numServers; 105 int numHosts; 106 int numRacks; 107 int numTables; 108 int numRegions; 109 110 int numMovedRegions = 0; // num moved regions from the initial configuration 111 Map<ServerName, List<RegionInfo>> clusterState; 112 113 private final RackManager rackManager; 114 // Maps region -> rackIndex -> locality of region on rack 115 private float[][] rackLocalities; 116 // Maps localityType -> region -> [server|rack]Index with highest locality 117 private int[][] regionsToMostLocalEntities; 118 // Maps region -> serverIndex -> regionCacheRatio of a region on a server 119 private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio; 120 // Maps regionIndex -> serverIndex with best region cache ratio 121 private int[] regionServerIndexWithBestRegionCachedRatio; 122 // Maps regionName -> oldServerName -> cache ratio of the region on the old server 123 Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; 124 125 static class DefaultRackManager extends RackManager { 126 @Override 127 public String getRack(ServerName server) { 128 return UNKNOWN_RACK; 129 } 130 } 131 132 BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 133 Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder, 134 RackManager rackManager) { 135 this(null, clusterState, loads, regionFinder, rackManager, null); 136 } 137 138 protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState, 139 Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder, 140 RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 141 this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio); 142 } 143 144 @SuppressWarnings("unchecked") 145 BalancerClusterState(Collection<RegionInfo> unassignedRegions, 146 Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads, 147 RegionLocationFinder regionFinder, RackManager rackManager, 148 Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) { 149 if (unassignedRegions == null) { 150 unassignedRegions = Collections.emptyList(); 151 } 152 153 serversToIndex = new HashMap<>(); 154 hostsToIndex = new HashMap<>(); 155 racksToIndex = new HashMap<>(); 156 tablesToIndex = new HashMap<>(); 157 158 // TODO: We should get the list of tables from master 159 tables = new ArrayList<>(); 160 this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); 161 162 this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio; 163 164 numRegions = 0; 165 166 List<List<Integer>> serversPerHostList = new ArrayList<>(); 167 List<List<Integer>> serversPerRackList = new ArrayList<>(); 168 this.clusterState = clusterState; 169 this.regionFinder = regionFinder; 170 171 // Use servername and port as there can be dead servers in this list. We want everything with 172 // a matching hostname and port to have the same index. 173 for (ServerName sn : clusterState.keySet()) { 174 if (sn == null) { 175 LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " 176 + "skipping; unassigned regions?"); 177 if (LOG.isTraceEnabled()) { 178 LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); 179 } 180 continue; 181 } 182 if (serversToIndex.get(sn.getAddress()) == null) { 183 serversToIndex.put(sn.getAddress(), numServers++); 184 } 185 if (!hostsToIndex.containsKey(sn.getHostname())) { 186 hostsToIndex.put(sn.getHostname(), numHosts++); 187 serversPerHostList.add(new ArrayList<>(1)); 188 } 189 190 int serverIndex = serversToIndex.get(sn.getAddress()); 191 int hostIndex = hostsToIndex.get(sn.getHostname()); 192 serversPerHostList.get(hostIndex).add(serverIndex); 193 194 String rack = this.rackManager.getRack(sn); 195 if (!racksToIndex.containsKey(rack)) { 196 racksToIndex.put(rack, numRacks++); 197 serversPerRackList.add(new ArrayList<>()); 198 } 199 int rackIndex = racksToIndex.get(rack); 200 serversPerRackList.get(rackIndex).add(serverIndex); 201 } 202 LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex); 203 // Count how many regions there are. 204 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 205 numRegions += entry.getValue().size(); 206 } 207 numRegions += unassignedRegions.size(); 208 209 regionsToIndex = new HashMap<>(numRegions); 210 servers = new ServerName[numServers]; 211 serversPerHost = new int[numHosts][]; 212 serversPerRack = new int[numRacks][]; 213 regions = new RegionInfo[numRegions]; 214 regionIndexToServerIndex = new int[numRegions]; 215 initialRegionIndexToServerIndex = new int[numRegions]; 216 regionIndexToTableIndex = new int[numRegions]; 217 regionIndexToPrimaryIndex = new int[numRegions]; 218 regionLoads = new Deque[numRegions]; 219 220 regionLocations = new int[numRegions][]; 221 serverIndicesSortedByRegionCount = new Integer[numServers]; 222 serverIndicesSortedByLocality = new Integer[numServers]; 223 localityPerServer = new float[numServers]; 224 225 serverIndexToHostIndex = new int[numServers]; 226 serverIndexToRackIndex = new int[numServers]; 227 regionsPerServer = new int[numServers][]; 228 serverIndexToRegionsOffset = new int[numServers]; 229 regionsPerHost = new int[numHosts][]; 230 regionsPerRack = new int[numRacks][]; 231 colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers]; 232 colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts]; 233 colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks]; 234 235 int regionIndex = 0, regionPerServerIndex = 0; 236 237 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 238 if (entry.getKey() == null) { 239 LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); 240 continue; 241 } 242 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 243 244 // keep the servername if this is the first server name for this hostname 245 // or this servername has the newest startcode. 246 if ( 247 servers[serverIndex] == null 248 || servers[serverIndex].getStartcode() < entry.getKey().getStartcode() 249 ) { 250 servers[serverIndex] = entry.getKey(); 251 } 252 253 if (regionsPerServer[serverIndex] != null) { 254 // there is another server with the same hostAndPort in ClusterState. 255 // allocate the array for the total size 256 regionsPerServer[serverIndex] = 257 new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; 258 } else { 259 regionsPerServer[serverIndex] = new int[entry.getValue().size()]; 260 } 261 colocatedReplicaCountsPerServer[serverIndex] = 262 new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 263 serverIndicesSortedByRegionCount[serverIndex] = serverIndex; 264 serverIndicesSortedByLocality[serverIndex] = serverIndex; 265 } 266 267 hosts = new String[numHosts]; 268 for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) { 269 hosts[entry.getValue()] = entry.getKey(); 270 } 271 racks = new String[numRacks]; 272 for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) { 273 racks[entry.getValue()] = entry.getKey(); 274 } 275 276 for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) { 277 int serverIndex = serversToIndex.get(entry.getKey().getAddress()); 278 regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; 279 280 int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); 281 serverIndexToHostIndex[serverIndex] = hostIndex; 282 283 int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); 284 serverIndexToRackIndex[serverIndex] = rackIndex; 285 286 for (RegionInfo region : entry.getValue()) { 287 registerRegion(region, regionIndex, serverIndex, loads, regionFinder); 288 regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; 289 regionIndex++; 290 } 291 serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; 292 } 293 294 for (RegionInfo region : unassignedRegions) { 295 registerRegion(region, regionIndex, -1, loads, regionFinder); 296 regionIndex++; 297 } 298 299 for (int i = 0; i < serversPerHostList.size(); i++) { 300 serversPerHost[i] = new int[serversPerHostList.get(i).size()]; 301 for (int j = 0; j < serversPerHost[i].length; j++) { 302 serversPerHost[i][j] = serversPerHostList.get(i).get(j); 303 LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i); 304 } 305 if (serversPerHost[i].length > 1) { 306 multiServersPerHost = true; 307 } 308 } 309 310 for (int i = 0; i < serversPerRackList.size(); i++) { 311 serversPerRack[i] = new int[serversPerRackList.get(i).size()]; 312 for (int j = 0; j < serversPerRack[i].length; j++) { 313 serversPerRack[i][j] = serversPerRackList.get(i).get(j); 314 LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i); 315 } 316 } 317 318 numTables = tables.size(); 319 LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts, 320 numRacks); 321 numRegionsPerServerPerTable = new int[numTables][numServers]; 322 numRegionsPerTable = new int[numTables]; 323 324 for (int i = 0; i < numTables; i++) { 325 for (int j = 0; j < numServers; j++) { 326 numRegionsPerServerPerTable[i][j] = 0; 327 } 328 } 329 330 for (int i = 0; i < regionIndexToServerIndex.length; i++) { 331 if (regionIndexToServerIndex[i] >= 0) { 332 numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++; 333 numRegionsPerTable[regionIndexToTableIndex[i]]++; 334 } 335 } 336 337 for (int i = 0; i < regions.length; i++) { 338 RegionInfo info = regions[i]; 339 if (RegionReplicaUtil.isDefaultReplica(info)) { 340 regionIndexToPrimaryIndex[i] = i; 341 } else { 342 hasRegionReplicas = true; 343 RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); 344 regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); 345 } 346 } 347 348 for (int i = 0; i < regionsPerServer.length; i++) { 349 colocatedReplicaCountsPerServer[i] = 350 new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0); 351 for (int j = 0; j < regionsPerServer[i].length; j++) { 352 int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; 353 colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex); 354 } 355 } 356 // compute regionsPerHost 357 if (multiServersPerHost) { 358 populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost, 359 serversPerHost); 360 } 361 362 // compute regionsPerRack 363 if (numRacks > 1) { 364 populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack, 365 serversPerRack); 366 } 367 } 368 369 private void populateRegionPerLocationFromServer(int[][] regionsPerLocation, 370 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) { 371 for (int i = 0; i < serversPerLocation.length; i++) { 372 int numRegionsPerLocation = 0; 373 for (int j = 0; j < serversPerLocation[i].length; j++) { 374 numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length; 375 } 376 regionsPerLocation[i] = new int[numRegionsPerLocation]; 377 colocatedReplicaCountsPerLocation[i] = 378 new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0); 379 } 380 381 for (int i = 0; i < serversPerLocation.length; i++) { 382 int numRegionPerLocationIndex = 0; 383 for (int j = 0; j < serversPerLocation[i].length; j++) { 384 for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) { 385 int region = regionsPerServer[serversPerLocation[i][j]][k]; 386 regionsPerLocation[i][numRegionPerLocationIndex] = region; 387 int primaryIndex = regionIndexToPrimaryIndex[region]; 388 colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex); 389 numRegionPerLocationIndex++; 390 } 391 } 392 } 393 394 } 395 396 /** Helper for Cluster constructor to handle a region */ 397 private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, 398 Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder) { 399 String tableName = region.getTable().getNameAsString(); 400 if (!tablesToIndex.containsKey(tableName)) { 401 tables.add(tableName); 402 tablesToIndex.put(tableName, tablesToIndex.size()); 403 } 404 int tableIndex = tablesToIndex.get(tableName); 405 406 regionsToIndex.put(region, regionIndex); 407 regions[regionIndex] = region; 408 regionIndexToServerIndex[regionIndex] = serverIndex; 409 initialRegionIndexToServerIndex[regionIndex] = serverIndex; 410 regionIndexToTableIndex[regionIndex] = tableIndex; 411 412 // region load 413 if (loads != null) { 414 Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString()); 415 // That could have failed if the RegionLoad is using the other regionName 416 if (rl == null) { 417 // Try getting the region load using encoded name. 418 rl = loads.get(region.getEncodedName()); 419 } 420 regionLoads[regionIndex] = rl; 421 } 422 423 if (regionFinder != null) { 424 // region location 425 List<ServerName> loc = regionFinder.getTopBlockLocations(region); 426 regionLocations[regionIndex] = new int[loc.size()]; 427 for (int i = 0; i < loc.size(); i++) { 428 regionLocations[regionIndex][i] = loc.get(i) == null 429 ? -1 430 : (serversToIndex.get(loc.get(i).getAddress()) == null 431 ? -1 432 : serversToIndex.get(loc.get(i).getAddress())); 433 } 434 } 435 } 436 437 /** 438 * Returns true iff a given server has less regions than the balanced amount 439 */ 440 public boolean serverHasTooFewRegions(int server) { 441 int minLoad = this.numRegions / numServers; 442 int numRegions = getNumRegions(server); 443 return numRegions < minLoad; 444 } 445 446 /** 447 * Retrieves and lazily initializes a field storing the locality of every region/server 448 * combination 449 */ 450 public float[][] getOrComputeRackLocalities() { 451 if (rackLocalities == null || regionsToMostLocalEntities == null) { 452 computeCachedLocalities(); 453 } 454 return rackLocalities; 455 } 456 457 /** 458 * Lazily initializes and retrieves a mapping of region -> server for which region has the highest 459 * the locality 460 */ 461 public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { 462 if (rackLocalities == null || regionsToMostLocalEntities == null) { 463 computeCachedLocalities(); 464 } 465 return regionsToMostLocalEntities[type.ordinal()]; 466 } 467 468 /** 469 * Looks up locality from cache of localities. Will create cache if it does not already exist. 470 */ 471 public float getOrComputeLocality(int region, int entity, 472 BalancerClusterState.LocalityType type) { 473 switch (type) { 474 case SERVER: 475 return getLocalityOfRegion(region, entity); 476 case RACK: 477 return getOrComputeRackLocalities()[region][entity]; 478 default: 479 throw new IllegalArgumentException("Unsupported LocalityType: " + type); 480 } 481 } 482 483 /** 484 * Returns locality weighted by region size in MB. Will create locality cache if it does not 485 * already exist. 486 */ 487 public double getOrComputeWeightedLocality(int region, int server, 488 BalancerClusterState.LocalityType type) { 489 return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); 490 } 491 492 /** 493 * Returns the size in MB from the most recent RegionLoad for region 494 */ 495 public int getRegionSizeMB(int region) { 496 Deque<BalancerRegionLoad> load = regionLoads[region]; 497 // This means regions have no actual data on disk 498 if (load == null) { 499 return 0; 500 } 501 return regionLoads[region].getLast().getStorefileSizeMB(); 502 } 503 504 /** 505 * Computes and caches the locality for each region/rack combinations, as well as storing a 506 * mapping of region -> server and region -> rack such that server and rack have the highest 507 * locality for region 508 */ 509 private void computeCachedLocalities() { 510 rackLocalities = new float[numRegions][numRacks]; 511 regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; 512 513 // Compute localities and find most local server per region 514 for (int region = 0; region < numRegions; region++) { 515 int serverWithBestLocality = 0; 516 float bestLocalityForRegion = 0; 517 for (int server = 0; server < numServers; server++) { 518 // Aggregate per-rack locality 519 float locality = getLocalityOfRegion(region, server); 520 int rack = serverIndexToRackIndex[server]; 521 int numServersInRack = serversPerRack[rack].length; 522 rackLocalities[region][rack] += locality / numServersInRack; 523 524 if (locality > bestLocalityForRegion) { 525 serverWithBestLocality = server; 526 bestLocalityForRegion = locality; 527 } 528 } 529 regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; 530 531 // Find most local rack per region 532 int rackWithBestLocality = 0; 533 float bestRackLocalityForRegion = 0.0f; 534 for (int rack = 0; rack < numRacks; rack++) { 535 float rackLocality = rackLocalities[region][rack]; 536 if (rackLocality > bestRackLocalityForRegion) { 537 bestRackLocalityForRegion = rackLocality; 538 rackWithBestLocality = rack; 539 } 540 } 541 regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; 542 } 543 544 } 545 546 /** 547 * Returns the size of hFiles from the most recent RegionLoad for region 548 */ 549 public int getTotalRegionHFileSizeMB(int region) { 550 Deque<BalancerRegionLoad> load = regionLoads[region]; 551 if (load == null) { 552 // This means, that the region has no actual data on disk 553 return 0; 554 } 555 return regionLoads[region].getLast().getRegionSizeMB(); 556 } 557 558 /** 559 * Returns the weighted cache ratio of a region on the given region server 560 */ 561 public float getOrComputeWeightedRegionCacheRatio(int region, int server) { 562 return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server); 563 } 564 565 /** 566 * Returns the amount by which a region is cached on a given region server. If the region is not 567 * currently hosted on the given region server, then find out if it was previously hosted there 568 * and return the old cache ratio. 569 */ 570 protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) { 571 float regionCacheRatio = 0.0f; 572 573 // Get the current region cache ratio if the region is hosted on the server regionServerIndex 574 for (int regionIndex : regionsPerServer[regionServerIndex]) { 575 if (region != regionIndex) { 576 continue; 577 } 578 579 Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex]; 580 581 // The region is currently hosted on this region server. Get the region cache ratio for this 582 // region on this server 583 regionCacheRatio = 584 regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio(); 585 586 return regionCacheRatio; 587 } 588 589 // Region is not currently hosted on this server. Check if the region was cached on this 590 // server earlier. This can happen when the server was shutdown and the cache was persisted. 591 // Search using the region name and server name and not the index id and server id as these ids 592 // may change when a server is marked as dead or a new server is added. 593 String regionEncodedName = regions[region].getEncodedName(); 594 ServerName serverName = servers[regionServerIndex]; 595 if ( 596 regionCacheRatioOnOldServerMap != null 597 && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName) 598 ) { 599 Pair<ServerName, Float> cacheRatioOfRegionOnServer = 600 regionCacheRatioOnOldServerMap.get(regionEncodedName); 601 if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) { 602 regionCacheRatio = cacheRatioOfRegionOnServer.getSecond(); 603 if (LOG.isDebugEnabled()) { 604 LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName, 605 serverName, regionCacheRatio); 606 } 607 } 608 } 609 return regionCacheRatio; 610 } 611 612 /** 613 * Populate the maps containing information about how much a region is cached on a region server. 614 */ 615 private void computeRegionServerRegionCacheRatio() { 616 regionIndexServerIndexRegionCachedRatio = new HashMap<>(); 617 regionServerIndexWithBestRegionCachedRatio = new int[numRegions]; 618 619 for (int region = 0; region < numRegions; region++) { 620 float bestRegionCacheRatio = 0.0f; 621 int serverWithBestRegionCacheRatio = 0; 622 for (int server = 0; server < numServers; server++) { 623 float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server); 624 if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) { 625 // A region with cache ratio 0 on a server means nothing. Hence, just make a note of 626 // cache ratio only if the cache ratio is greater than 0. 627 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 628 regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio); 629 } 630 if (regionCacheRatio > bestRegionCacheRatio) { 631 serverWithBestRegionCacheRatio = server; 632 // If the server currently hosting the region has equal cache ratio to a historical 633 // server, consider the current server to keep hosting the region 634 bestRegionCacheRatio = regionCacheRatio; 635 } else if ( 636 regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region] 637 ) { 638 // If two servers have same region cache ratio, then the server currently hosting the 639 // region 640 // should retain the region 641 serverWithBestRegionCacheRatio = server; 642 } 643 } 644 regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio; 645 Pair<Integer, Integer> regionServerPair = 646 new Pair<>(region, regionIndexToServerIndex[region]); 647 float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair); 648 if (tempRegionCacheRatio > bestRegionCacheRatio) { 649 LOG.warn( 650 "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the " 651 + "best region cache ratio {} on server {}", 652 regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]], 653 tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]); 654 } 655 } 656 } 657 658 protected float getOrComputeRegionCacheRatio(int region, int server) { 659 if ( 660 regionServerIndexWithBestRegionCachedRatio == null 661 || regionIndexServerIndexRegionCachedRatio.isEmpty() 662 ) { 663 computeRegionServerRegionCacheRatio(); 664 } 665 666 Pair<Integer, Integer> regionServerPair = new Pair<>(region, server); 667 return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair) 668 ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair) 669 : 0.0f; 670 } 671 672 public int[] getOrComputeServerWithBestRegionCachedRatio() { 673 if ( 674 regionServerIndexWithBestRegionCachedRatio == null 675 || regionIndexServerIndexRegionCachedRatio.isEmpty() 676 ) { 677 computeRegionServerRegionCacheRatio(); 678 } 679 return regionServerIndexWithBestRegionCachedRatio; 680 } 681 682 /** 683 * Maps region index to rack index 684 */ 685 public int getRackForRegion(int region) { 686 return serverIndexToRackIndex[regionIndexToServerIndex[region]]; 687 } 688 689 enum LocalityType { 690 SERVER, 691 RACK 692 } 693 694 public void doAction(BalanceAction action) { 695 switch (action.getType()) { 696 case NULL: 697 break; 698 case ASSIGN_REGION: 699 // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings 700 assert action instanceof AssignRegionAction : action.getClass(); 701 AssignRegionAction ar = (AssignRegionAction) action; 702 regionsPerServer[ar.getServer()] = 703 addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); 704 regionMoved(ar.getRegion(), -1, ar.getServer()); 705 break; 706 case MOVE_REGION: 707 assert action instanceof MoveRegionAction : action.getClass(); 708 MoveRegionAction mra = (MoveRegionAction) action; 709 regionsPerServer[mra.getFromServer()] = 710 removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); 711 regionsPerServer[mra.getToServer()] = 712 addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); 713 regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); 714 break; 715 case SWAP_REGIONS: 716 assert action instanceof SwapRegionsAction : action.getClass(); 717 SwapRegionsAction a = (SwapRegionsAction) action; 718 regionsPerServer[a.getFromServer()] = 719 replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); 720 regionsPerServer[a.getToServer()] = 721 replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); 722 regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); 723 regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); 724 break; 725 default: 726 throw new RuntimeException("Uknown action:" + action.getType()); 727 } 728 } 729 730 /** 731 * Return true if the placement of region on server would lower the availability of the region in 732 * question 733 * @return true or false 734 */ 735 boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { 736 if (!serversToIndex.containsKey(serverName.getAddress())) { 737 return false; // safeguard against race between cluster.servers and servers from LB method 738 // args 739 } 740 int server = serversToIndex.get(serverName.getAddress()); 741 int region = regionsToIndex.get(regionInfo); 742 743 // Region replicas for same region should better assign to different servers 744 for (int i : regionsPerServer[server]) { 745 RegionInfo otherRegionInfo = regions[i]; 746 if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { 747 return true; 748 } 749 } 750 751 int primary = regionIndexToPrimaryIndex[region]; 752 if (primary == -1) { 753 return false; 754 } 755 // there is a subset relation for server < host < rack 756 // check server first 757 int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary); 758 if (result != 0) { 759 return result > 0; 760 } 761 762 // check host 763 if (multiServersPerHost) { 764 result = checkLocationForPrimary(serverIndexToHostIndex[server], 765 colocatedReplicaCountsPerHost, primary); 766 if (result != 0) { 767 return result > 0; 768 } 769 } 770 771 // check rack 772 if (numRacks > 1) { 773 result = checkLocationForPrimary(serverIndexToRackIndex[server], 774 colocatedReplicaCountsPerRack, primary); 775 if (result != 0) { 776 return result > 0; 777 } 778 } 779 return false; 780 } 781 782 /** 783 * Common method for better solution check. 784 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 785 * colocatedReplicaCountsPerRack 786 * @return 1 for better, -1 for no better, 0 for unknown 787 */ 788 private int checkLocationForPrimary(int location, 789 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) { 790 if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) { 791 // check for whether there are other Locations that we can place this region 792 for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) { 793 if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) { 794 return 1; // meaning there is a better Location 795 } 796 } 797 return -1; // there is not a better Location to place this 798 } 799 return 0; 800 } 801 802 void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { 803 if (!serversToIndex.containsKey(serverName.getAddress())) { 804 return; 805 } 806 int server = serversToIndex.get(serverName.getAddress()); 807 int region = regionsToIndex.get(regionInfo); 808 doAction(new AssignRegionAction(region, server)); 809 } 810 811 void regionMoved(int region, int oldServer, int newServer) { 812 regionIndexToServerIndex[region] = newServer; 813 if (initialRegionIndexToServerIndex[region] == newServer) { 814 numMovedRegions--; // region moved back to original location 815 } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { 816 numMovedRegions++; // region moved from original location 817 } 818 int tableIndex = regionIndexToTableIndex[region]; 819 if (oldServer >= 0) { 820 numRegionsPerServerPerTable[tableIndex][oldServer]--; 821 } 822 numRegionsPerServerPerTable[tableIndex][newServer]++; 823 824 // update for servers 825 int primary = regionIndexToPrimaryIndex[region]; 826 if (oldServer >= 0) { 827 colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary); 828 } 829 colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary); 830 831 // update for hosts 832 if (multiServersPerHost) { 833 updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost, 834 oldServer, newServer, primary, region); 835 } 836 837 // update for racks 838 if (numRacks > 1) { 839 updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack, 840 oldServer, newServer, primary, region); 841 } 842 } 843 844 /** 845 * Common method for per host and per Location region index updates when a region is moved. 846 * @param serverIndexToLocation serverIndexToHostIndex or serverIndexToLocationIndex 847 * @param regionsPerLocation regionsPerHost or regionsPerLocation 848 * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or 849 * colocatedReplicaCountsPerRack 850 */ 851 private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation, 852 Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer, 853 int primary, int region) { 854 int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1; 855 int newLocation = serverIndexToLocation[newServer]; 856 if (newLocation != oldLocation) { 857 regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region); 858 colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary); 859 if (oldLocation >= 0) { 860 regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region); 861 colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary); 862 } 863 } 864 865 } 866 867 int[] removeRegion(int[] regions, int regionIndex) { 868 // TODO: this maybe costly. Consider using linked lists 869 int[] newRegions = new int[regions.length - 1]; 870 int i = 0; 871 for (i = 0; i < regions.length; i++) { 872 if (regions[i] == regionIndex) { 873 break; 874 } 875 newRegions[i] = regions[i]; 876 } 877 System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); 878 return newRegions; 879 } 880 881 int[] addRegion(int[] regions, int regionIndex) { 882 int[] newRegions = new int[regions.length + 1]; 883 System.arraycopy(regions, 0, newRegions, 0, regions.length); 884 newRegions[newRegions.length - 1] = regionIndex; 885 return newRegions; 886 } 887 888 int[] addRegionSorted(int[] regions, int regionIndex) { 889 int[] newRegions = new int[regions.length + 1]; 890 int i = 0; 891 for (i = 0; i < regions.length; i++) { // find the index to insert 892 if (regions[i] > regionIndex) { 893 break; 894 } 895 } 896 System.arraycopy(regions, 0, newRegions, 0, i); // copy first half 897 System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half 898 newRegions[i] = regionIndex; 899 900 return newRegions; 901 } 902 903 int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { 904 int i = 0; 905 for (i = 0; i < regions.length; i++) { 906 if (regions[i] == regionIndex) { 907 regions[i] = newRegionIndex; 908 break; 909 } 910 } 911 return regions; 912 } 913 914 void sortServersByRegionCount() { 915 Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); 916 } 917 918 int getNumRegions(int server) { 919 return regionsPerServer[server].length; 920 } 921 922 public Comparator<Integer> getNumRegionsComparator() { 923 return numRegionsComparator; 924 } 925 926 boolean contains(int[] arr, int val) { 927 return Arrays.binarySearch(arr, val) >= 0; 928 } 929 930 private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions); 931 932 int getLowestLocalityRegionOnServer(int serverIndex) { 933 if (regionFinder != null) { 934 float lowestLocality = 1.0f; 935 int lowestLocalityRegionIndex = -1; 936 if (regionsPerServer[serverIndex].length == 0) { 937 // No regions on that region server 938 return -1; 939 } 940 for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { 941 int regionIndex = regionsPerServer[serverIndex][j]; 942 HDFSBlocksDistribution distribution = 943 regionFinder.getBlockDistribution(regions[regionIndex]); 944 float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); 945 // skip empty region 946 if (distribution.getUniqueBlocksTotalWeight() == 0) { 947 continue; 948 } 949 if (locality < lowestLocality) { 950 lowestLocality = locality; 951 lowestLocalityRegionIndex = j; 952 } 953 } 954 if (lowestLocalityRegionIndex == -1) { 955 return -1; 956 } 957 if (LOG.isTraceEnabled()) { 958 LOG.trace("Lowest locality region is " 959 + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] 960 .getRegionNameAsString() 961 + " with locality " + lowestLocality + " and its region server contains " 962 + regionsPerServer[serverIndex].length + " regions"); 963 } 964 return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; 965 } else { 966 return -1; 967 } 968 } 969 970 float getLocalityOfRegion(int region, int server) { 971 if (regionFinder != null) { 972 HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); 973 return distribution.getBlockLocalityIndex(servers[server].getHostname()); 974 } else { 975 return 0f; 976 } 977 } 978 979 void setNumRegions(int numRegions) { 980 this.numRegions = numRegions; 981 } 982 983 void setNumMovedRegions(int numMovedRegions) { 984 this.numMovedRegions = numMovedRegions; 985 } 986 987 @Override 988 public String toString() { 989 StringBuilder desc = new StringBuilder("Cluster={servers=["); 990 for (ServerName sn : servers) { 991 desc.append(sn.getAddress().toString()).append(", "); 992 } 993 desc.append("], serverIndicesSortedByRegionCount=") 994 .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") 995 .append(Arrays.deepToString(regionsPerServer)); 996 997 desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) 998 .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) 999 .append('}'); 1000 return desc.toString(); 1001 } 1002}