001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import org.agrona.collections.Hashing;
030import org.agrona.collections.Int2IntCounterMap;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionReplicaUtil;
035import org.apache.hadoop.hbase.master.RackManager;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * An efficient array based implementation similar to ClusterState for keeping the status of the
044 * cluster in terms of region assignment and distribution. LoadBalancers, such as
045 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
046 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
047 * <p/>
048 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
049 * topology in terms of server names, hostnames and racks.
050 */
051@InterfaceAudience.Private
052class BalancerClusterState {
053
054  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
055
056  ServerName[] servers;
057  // ServerName uniquely identifies a region server. multiple RS can run on the same host
058  String[] hosts;
059  String[] racks;
060  boolean multiServersPerHost = false; // whether or not any host has more than one server
061
062  ArrayList<String> tables;
063  RegionInfo[] regions;
064  Deque<BalancerRegionLoad>[] regionLoads;
065  private RegionLocationFinder regionFinder;
066
067  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
068
069  int[] serverIndexToHostIndex; // serverIndex -> host index
070  int[] serverIndexToRackIndex; // serverIndex -> rack index
071
072  int[][] regionsPerServer; // serverIndex -> region list
073  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
074  int[][] regionsPerHost; // hostIndex -> list of regions
075  int[][] regionsPerRack; // rackIndex -> region list
076  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
077  // replicas by primary region index
078  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
079  // primary region index
080  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
081  // primary region index
082
083  int[][] serversPerHost; // hostIndex -> list of server indexes
084  int[][] serversPerRack; // rackIndex -> list of server indexes
085  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
086  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
087  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
088  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> # regions
089  int[] numRegionsPerTable; // tableIndex -> region count
090  int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS
091  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
092  boolean hasRegionReplicas = false; // whether there is regions with replicas
093
094  Integer[] serverIndicesSortedByRegionCount;
095  Integer[] serverIndicesSortedByLocality;
096
097  Map<Address, Integer> serversToIndex;
098  Map<String, Integer> hostsToIndex;
099  Map<String, Integer> racksToIndex;
100  Map<String, Integer> tablesToIndex;
101  Map<RegionInfo, Integer> regionsToIndex;
102  float[] localityPerServer;
103
104  int numServers;
105  int numHosts;
106  int numRacks;
107  int numTables;
108  int numRegions;
109
110  int numMovedRegions = 0; // num moved regions from the initial configuration
111  Map<ServerName, List<RegionInfo>> clusterState;
112
113  private final RackManager rackManager;
114  // Maps region -> rackIndex -> locality of region on rack
115  private float[][] rackLocalities;
116  // Maps localityType -> region -> [server|rack]Index with highest locality
117  private int[][] regionsToMostLocalEntities;
118  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
119  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
120  // Maps regionIndex -> serverIndex with best region cache ratio
121  private int[] regionServerIndexWithBestRegionCachedRatio;
122  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
123  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
124
125  static class DefaultRackManager extends RackManager {
126    @Override
127    public String getRack(ServerName server) {
128      return UNKNOWN_RACK;
129    }
130  }
131
132  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
133    Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
134    RackManager rackManager) {
135    this(null, clusterState, loads, regionFinder, rackManager, null);
136  }
137
138  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
139    Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder,
140    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
141    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
142  }
143
144  @SuppressWarnings("unchecked")
145  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
146    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
147    RegionLocationFinder regionFinder, RackManager rackManager,
148    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
149    if (unassignedRegions == null) {
150      unassignedRegions = Collections.emptyList();
151    }
152
153    serversToIndex = new HashMap<>();
154    hostsToIndex = new HashMap<>();
155    racksToIndex = new HashMap<>();
156    tablesToIndex = new HashMap<>();
157
158    // TODO: We should get the list of tables from master
159    tables = new ArrayList<>();
160    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
161
162    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
163
164    numRegions = 0;
165
166    List<List<Integer>> serversPerHostList = new ArrayList<>();
167    List<List<Integer>> serversPerRackList = new ArrayList<>();
168    this.clusterState = clusterState;
169    this.regionFinder = regionFinder;
170
171    // Use servername and port as there can be dead servers in this list. We want everything with
172    // a matching hostname and port to have the same index.
173    for (ServerName sn : clusterState.keySet()) {
174      if (sn == null) {
175        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
176          + "skipping; unassigned regions?");
177        if (LOG.isTraceEnabled()) {
178          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
179        }
180        continue;
181      }
182      if (serversToIndex.get(sn.getAddress()) == null) {
183        serversToIndex.put(sn.getAddress(), numServers++);
184      }
185      if (!hostsToIndex.containsKey(sn.getHostname())) {
186        hostsToIndex.put(sn.getHostname(), numHosts++);
187        serversPerHostList.add(new ArrayList<>(1));
188      }
189
190      int serverIndex = serversToIndex.get(sn.getAddress());
191      int hostIndex = hostsToIndex.get(sn.getHostname());
192      serversPerHostList.get(hostIndex).add(serverIndex);
193
194      String rack = this.rackManager.getRack(sn);
195      if (!racksToIndex.containsKey(rack)) {
196        racksToIndex.put(rack, numRacks++);
197        serversPerRackList.add(new ArrayList<>());
198      }
199      int rackIndex = racksToIndex.get(rack);
200      serversPerRackList.get(rackIndex).add(serverIndex);
201    }
202    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
203    // Count how many regions there are.
204    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
205      numRegions += entry.getValue().size();
206    }
207    numRegions += unassignedRegions.size();
208
209    regionsToIndex = new HashMap<>(numRegions);
210    servers = new ServerName[numServers];
211    serversPerHost = new int[numHosts][];
212    serversPerRack = new int[numRacks][];
213    regions = new RegionInfo[numRegions];
214    regionIndexToServerIndex = new int[numRegions];
215    initialRegionIndexToServerIndex = new int[numRegions];
216    regionIndexToTableIndex = new int[numRegions];
217    regionIndexToPrimaryIndex = new int[numRegions];
218    regionLoads = new Deque[numRegions];
219
220    regionLocations = new int[numRegions][];
221    serverIndicesSortedByRegionCount = new Integer[numServers];
222    serverIndicesSortedByLocality = new Integer[numServers];
223    localityPerServer = new float[numServers];
224
225    serverIndexToHostIndex = new int[numServers];
226    serverIndexToRackIndex = new int[numServers];
227    regionsPerServer = new int[numServers][];
228    serverIndexToRegionsOffset = new int[numServers];
229    regionsPerHost = new int[numHosts][];
230    regionsPerRack = new int[numRacks][];
231    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
232    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
233    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
234
235    int regionIndex = 0, regionPerServerIndex = 0;
236
237    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
238      if (entry.getKey() == null) {
239        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
240        continue;
241      }
242      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
243
244      // keep the servername if this is the first server name for this hostname
245      // or this servername has the newest startcode.
246      if (
247        servers[serverIndex] == null
248          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
249      ) {
250        servers[serverIndex] = entry.getKey();
251      }
252
253      if (regionsPerServer[serverIndex] != null) {
254        // there is another server with the same hostAndPort in ClusterState.
255        // allocate the array for the total size
256        regionsPerServer[serverIndex] =
257          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
258      } else {
259        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
260      }
261      colocatedReplicaCountsPerServer[serverIndex] =
262        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
263      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
264      serverIndicesSortedByLocality[serverIndex] = serverIndex;
265    }
266
267    hosts = new String[numHosts];
268    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
269      hosts[entry.getValue()] = entry.getKey();
270    }
271    racks = new String[numRacks];
272    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
273      racks[entry.getValue()] = entry.getKey();
274    }
275
276    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
277      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
278      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
279
280      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
281      serverIndexToHostIndex[serverIndex] = hostIndex;
282
283      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
284      serverIndexToRackIndex[serverIndex] = rackIndex;
285
286      for (RegionInfo region : entry.getValue()) {
287        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
288        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
289        regionIndex++;
290      }
291      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
292    }
293
294    for (RegionInfo region : unassignedRegions) {
295      registerRegion(region, regionIndex, -1, loads, regionFinder);
296      regionIndex++;
297    }
298
299    for (int i = 0; i < serversPerHostList.size(); i++) {
300      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
301      for (int j = 0; j < serversPerHost[i].length; j++) {
302        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
303        LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
304      }
305      if (serversPerHost[i].length > 1) {
306        multiServersPerHost = true;
307      }
308    }
309
310    for (int i = 0; i < serversPerRackList.size(); i++) {
311      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
312      for (int j = 0; j < serversPerRack[i].length; j++) {
313        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
314        LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
315      }
316    }
317
318    numTables = tables.size();
319    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
320      numRacks);
321    numRegionsPerServerPerTable = new int[numTables][numServers];
322    numRegionsPerTable = new int[numTables];
323
324    for (int i = 0; i < numTables; i++) {
325      for (int j = 0; j < numServers; j++) {
326        numRegionsPerServerPerTable[i][j] = 0;
327      }
328    }
329
330    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
331      if (regionIndexToServerIndex[i] >= 0) {
332        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
333        numRegionsPerTable[regionIndexToTableIndex[i]]++;
334      }
335    }
336
337    for (int i = 0; i < regions.length; i++) {
338      RegionInfo info = regions[i];
339      if (RegionReplicaUtil.isDefaultReplica(info)) {
340        regionIndexToPrimaryIndex[i] = i;
341      } else {
342        hasRegionReplicas = true;
343        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
344        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
345      }
346    }
347
348    for (int i = 0; i < regionsPerServer.length; i++) {
349      colocatedReplicaCountsPerServer[i] =
350        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
351      for (int j = 0; j < regionsPerServer[i].length; j++) {
352        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
353        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
354      }
355    }
356    // compute regionsPerHost
357    if (multiServersPerHost) {
358      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
359        serversPerHost);
360    }
361
362    // compute regionsPerRack
363    if (numRacks > 1) {
364      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
365        serversPerRack);
366    }
367  }
368
369  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
370    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
371    for (int i = 0; i < serversPerLocation.length; i++) {
372      int numRegionsPerLocation = 0;
373      for (int j = 0; j < serversPerLocation[i].length; j++) {
374        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
375      }
376      regionsPerLocation[i] = new int[numRegionsPerLocation];
377      colocatedReplicaCountsPerLocation[i] =
378        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
379    }
380
381    for (int i = 0; i < serversPerLocation.length; i++) {
382      int numRegionPerLocationIndex = 0;
383      for (int j = 0; j < serversPerLocation[i].length; j++) {
384        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
385          int region = regionsPerServer[serversPerLocation[i][j]][k];
386          regionsPerLocation[i][numRegionPerLocationIndex] = region;
387          int primaryIndex = regionIndexToPrimaryIndex[region];
388          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
389          numRegionPerLocationIndex++;
390        }
391      }
392    }
393
394  }
395
396  /** Helper for Cluster constructor to handle a region */
397  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
398    Map<String, Deque<BalancerRegionLoad>> loads, RegionLocationFinder regionFinder) {
399    String tableName = region.getTable().getNameAsString();
400    if (!tablesToIndex.containsKey(tableName)) {
401      tables.add(tableName);
402      tablesToIndex.put(tableName, tablesToIndex.size());
403    }
404    int tableIndex = tablesToIndex.get(tableName);
405
406    regionsToIndex.put(region, regionIndex);
407    regions[regionIndex] = region;
408    regionIndexToServerIndex[regionIndex] = serverIndex;
409    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
410    regionIndexToTableIndex[regionIndex] = tableIndex;
411
412    // region load
413    if (loads != null) {
414      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
415      // That could have failed if the RegionLoad is using the other regionName
416      if (rl == null) {
417        // Try getting the region load using encoded name.
418        rl = loads.get(region.getEncodedName());
419      }
420      regionLoads[regionIndex] = rl;
421    }
422
423    if (regionFinder != null) {
424      // region location
425      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
426      regionLocations[regionIndex] = new int[loc.size()];
427      for (int i = 0; i < loc.size(); i++) {
428        regionLocations[regionIndex][i] = loc.get(i) == null
429          ? -1
430          : (serversToIndex.get(loc.get(i).getAddress()) == null
431            ? -1
432            : serversToIndex.get(loc.get(i).getAddress()));
433      }
434    }
435  }
436
437  /**
438   * Returns true iff a given server has less regions than the balanced amount
439   */
440  public boolean serverHasTooFewRegions(int server) {
441    int minLoad = this.numRegions / numServers;
442    int numRegions = getNumRegions(server);
443    return numRegions < minLoad;
444  }
445
446  /**
447   * Retrieves and lazily initializes a field storing the locality of every region/server
448   * combination
449   */
450  public float[][] getOrComputeRackLocalities() {
451    if (rackLocalities == null || regionsToMostLocalEntities == null) {
452      computeCachedLocalities();
453    }
454    return rackLocalities;
455  }
456
457  /**
458   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
459   * the locality
460   */
461  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
462    if (rackLocalities == null || regionsToMostLocalEntities == null) {
463      computeCachedLocalities();
464    }
465    return regionsToMostLocalEntities[type.ordinal()];
466  }
467
468  /**
469   * Looks up locality from cache of localities. Will create cache if it does not already exist.
470   */
471  public float getOrComputeLocality(int region, int entity,
472    BalancerClusterState.LocalityType type) {
473    switch (type) {
474      case SERVER:
475        return getLocalityOfRegion(region, entity);
476      case RACK:
477        return getOrComputeRackLocalities()[region][entity];
478      default:
479        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
480    }
481  }
482
483  /**
484   * Returns locality weighted by region size in MB. Will create locality cache if it does not
485   * already exist.
486   */
487  public double getOrComputeWeightedLocality(int region, int server,
488    BalancerClusterState.LocalityType type) {
489    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
490  }
491
492  /**
493   * Returns the size in MB from the most recent RegionLoad for region
494   */
495  public int getRegionSizeMB(int region) {
496    Deque<BalancerRegionLoad> load = regionLoads[region];
497    // This means regions have no actual data on disk
498    if (load == null) {
499      return 0;
500    }
501    return regionLoads[region].getLast().getStorefileSizeMB();
502  }
503
504  /**
505   * Computes and caches the locality for each region/rack combinations, as well as storing a
506   * mapping of region -> server and region -> rack such that server and rack have the highest
507   * locality for region
508   */
509  private void computeCachedLocalities() {
510    rackLocalities = new float[numRegions][numRacks];
511    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
512
513    // Compute localities and find most local server per region
514    for (int region = 0; region < numRegions; region++) {
515      int serverWithBestLocality = 0;
516      float bestLocalityForRegion = 0;
517      for (int server = 0; server < numServers; server++) {
518        // Aggregate per-rack locality
519        float locality = getLocalityOfRegion(region, server);
520        int rack = serverIndexToRackIndex[server];
521        int numServersInRack = serversPerRack[rack].length;
522        rackLocalities[region][rack] += locality / numServersInRack;
523
524        if (locality > bestLocalityForRegion) {
525          serverWithBestLocality = server;
526          bestLocalityForRegion = locality;
527        }
528      }
529      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
530
531      // Find most local rack per region
532      int rackWithBestLocality = 0;
533      float bestRackLocalityForRegion = 0.0f;
534      for (int rack = 0; rack < numRacks; rack++) {
535        float rackLocality = rackLocalities[region][rack];
536        if (rackLocality > bestRackLocalityForRegion) {
537          bestRackLocalityForRegion = rackLocality;
538          rackWithBestLocality = rack;
539        }
540      }
541      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
542    }
543
544  }
545
546  /**
547   * Returns the size of hFiles from the most recent RegionLoad for region
548   */
549  public int getTotalRegionHFileSizeMB(int region) {
550    Deque<BalancerRegionLoad> load = regionLoads[region];
551    if (load == null) {
552      // This means, that the region has no actual data on disk
553      return 0;
554    }
555    return regionLoads[region].getLast().getRegionSizeMB();
556  }
557
558  /**
559   * Returns the weighted cache ratio of a region on the given region server
560   */
561  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
562    return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
563  }
564
565  /**
566   * Returns the amount by which a region is cached on a given region server. If the region is not
567   * currently hosted on the given region server, then find out if it was previously hosted there
568   * and return the old cache ratio.
569   */
570  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
571    float regionCacheRatio = 0.0f;
572
573    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
574    for (int regionIndex : regionsPerServer[regionServerIndex]) {
575      if (region != regionIndex) {
576        continue;
577      }
578
579      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
580
581      // The region is currently hosted on this region server. Get the region cache ratio for this
582      // region on this server
583      regionCacheRatio =
584        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
585
586      return regionCacheRatio;
587    }
588
589    // Region is not currently hosted on this server. Check if the region was cached on this
590    // server earlier. This can happen when the server was shutdown and the cache was persisted.
591    // Search using the region name and server name and not the index id and server id as these ids
592    // may change when a server is marked as dead or a new server is added.
593    String regionEncodedName = regions[region].getEncodedName();
594    ServerName serverName = servers[regionServerIndex];
595    if (
596      regionCacheRatioOnOldServerMap != null
597        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
598    ) {
599      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
600        regionCacheRatioOnOldServerMap.get(regionEncodedName);
601      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
602        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
603        if (LOG.isDebugEnabled()) {
604          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
605            serverName, regionCacheRatio);
606        }
607      }
608    }
609    return regionCacheRatio;
610  }
611
612  /**
613   * Populate the maps containing information about how much a region is cached on a region server.
614   */
615  private void computeRegionServerRegionCacheRatio() {
616    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
617    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
618
619    for (int region = 0; region < numRegions; region++) {
620      float bestRegionCacheRatio = 0.0f;
621      int serverWithBestRegionCacheRatio = 0;
622      for (int server = 0; server < numServers; server++) {
623        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
624        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
625          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
626          // cache ratio only if the cache ratio is greater than 0.
627          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
628          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
629        }
630        if (regionCacheRatio > bestRegionCacheRatio) {
631          serverWithBestRegionCacheRatio = server;
632          // If the server currently hosting the region has equal cache ratio to a historical
633          // server, consider the current server to keep hosting the region
634          bestRegionCacheRatio = regionCacheRatio;
635        } else if (
636          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
637        ) {
638          // If two servers have same region cache ratio, then the server currently hosting the
639          // region
640          // should retain the region
641          serverWithBestRegionCacheRatio = server;
642        }
643      }
644      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
645      Pair<Integer, Integer> regionServerPair =
646        new Pair<>(region, regionIndexToServerIndex[region]);
647      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
648      if (tempRegionCacheRatio > bestRegionCacheRatio) {
649        LOG.warn(
650          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
651            + "best region cache ratio {} on server {}",
652          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
653          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
654      }
655    }
656  }
657
658  protected float getOrComputeRegionCacheRatio(int region, int server) {
659    if (
660      regionServerIndexWithBestRegionCachedRatio == null
661        || regionIndexServerIndexRegionCachedRatio.isEmpty()
662    ) {
663      computeRegionServerRegionCacheRatio();
664    }
665
666    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
667    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
668      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
669      : 0.0f;
670  }
671
672  public int[] getOrComputeServerWithBestRegionCachedRatio() {
673    if (
674      regionServerIndexWithBestRegionCachedRatio == null
675        || regionIndexServerIndexRegionCachedRatio.isEmpty()
676    ) {
677      computeRegionServerRegionCacheRatio();
678    }
679    return regionServerIndexWithBestRegionCachedRatio;
680  }
681
682  /**
683   * Maps region index to rack index
684   */
685  public int getRackForRegion(int region) {
686    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
687  }
688
689  enum LocalityType {
690    SERVER,
691    RACK
692  }
693
694  public void doAction(BalanceAction action) {
695    switch (action.getType()) {
696      case NULL:
697        break;
698      case ASSIGN_REGION:
699        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
700        assert action instanceof AssignRegionAction : action.getClass();
701        AssignRegionAction ar = (AssignRegionAction) action;
702        regionsPerServer[ar.getServer()] =
703          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
704        regionMoved(ar.getRegion(), -1, ar.getServer());
705        break;
706      case MOVE_REGION:
707        assert action instanceof MoveRegionAction : action.getClass();
708        MoveRegionAction mra = (MoveRegionAction) action;
709        regionsPerServer[mra.getFromServer()] =
710          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
711        regionsPerServer[mra.getToServer()] =
712          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
713        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
714        break;
715      case SWAP_REGIONS:
716        assert action instanceof SwapRegionsAction : action.getClass();
717        SwapRegionsAction a = (SwapRegionsAction) action;
718        regionsPerServer[a.getFromServer()] =
719          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
720        regionsPerServer[a.getToServer()] =
721          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
722        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
723        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
724        break;
725      default:
726        throw new RuntimeException("Uknown action:" + action.getType());
727    }
728  }
729
730  /**
731   * Return true if the placement of region on server would lower the availability of the region in
732   * question
733   * @return true or false
734   */
735  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
736    if (!serversToIndex.containsKey(serverName.getAddress())) {
737      return false; // safeguard against race between cluster.servers and servers from LB method
738      // args
739    }
740    int server = serversToIndex.get(serverName.getAddress());
741    int region = regionsToIndex.get(regionInfo);
742
743    // Region replicas for same region should better assign to different servers
744    for (int i : regionsPerServer[server]) {
745      RegionInfo otherRegionInfo = regions[i];
746      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
747        return true;
748      }
749    }
750
751    int primary = regionIndexToPrimaryIndex[region];
752    if (primary == -1) {
753      return false;
754    }
755    // there is a subset relation for server < host < rack
756    // check server first
757    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
758    if (result != 0) {
759      return result > 0;
760    }
761
762    // check host
763    if (multiServersPerHost) {
764      result = checkLocationForPrimary(serverIndexToHostIndex[server],
765        colocatedReplicaCountsPerHost, primary);
766      if (result != 0) {
767        return result > 0;
768      }
769    }
770
771    // check rack
772    if (numRacks > 1) {
773      result = checkLocationForPrimary(serverIndexToRackIndex[server],
774        colocatedReplicaCountsPerRack, primary);
775      if (result != 0) {
776        return result > 0;
777      }
778    }
779    return false;
780  }
781
782  /**
783   * Common method for better solution check.
784   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
785   *                                          colocatedReplicaCountsPerRack
786   * @return 1 for better, -1 for no better, 0 for unknown
787   */
788  private int checkLocationForPrimary(int location,
789    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
790    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
791      // check for whether there are other Locations that we can place this region
792      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
793        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
794          return 1; // meaning there is a better Location
795        }
796      }
797      return -1; // there is not a better Location to place this
798    }
799    return 0;
800  }
801
802  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
803    if (!serversToIndex.containsKey(serverName.getAddress())) {
804      return;
805    }
806    int server = serversToIndex.get(serverName.getAddress());
807    int region = regionsToIndex.get(regionInfo);
808    doAction(new AssignRegionAction(region, server));
809  }
810
811  void regionMoved(int region, int oldServer, int newServer) {
812    regionIndexToServerIndex[region] = newServer;
813    if (initialRegionIndexToServerIndex[region] == newServer) {
814      numMovedRegions--; // region moved back to original location
815    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
816      numMovedRegions++; // region moved from original location
817    }
818    int tableIndex = regionIndexToTableIndex[region];
819    if (oldServer >= 0) {
820      numRegionsPerServerPerTable[tableIndex][oldServer]--;
821    }
822    numRegionsPerServerPerTable[tableIndex][newServer]++;
823
824    // update for servers
825    int primary = regionIndexToPrimaryIndex[region];
826    if (oldServer >= 0) {
827      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
828    }
829    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
830
831    // update for hosts
832    if (multiServersPerHost) {
833      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
834        oldServer, newServer, primary, region);
835    }
836
837    // update for racks
838    if (numRacks > 1) {
839      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
840        oldServer, newServer, primary, region);
841    }
842  }
843
844  /**
845   * Common method for per host and per Location region index updates when a region is moved.
846   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
847   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
848   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
849   *                                          colocatedReplicaCountsPerRack
850   */
851  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
852    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
853    int primary, int region) {
854    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
855    int newLocation = serverIndexToLocation[newServer];
856    if (newLocation != oldLocation) {
857      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
858      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
859      if (oldLocation >= 0) {
860        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
861        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
862      }
863    }
864
865  }
866
867  int[] removeRegion(int[] regions, int regionIndex) {
868    // TODO: this maybe costly. Consider using linked lists
869    int[] newRegions = new int[regions.length - 1];
870    int i = 0;
871    for (i = 0; i < regions.length; i++) {
872      if (regions[i] == regionIndex) {
873        break;
874      }
875      newRegions[i] = regions[i];
876    }
877    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
878    return newRegions;
879  }
880
881  int[] addRegion(int[] regions, int regionIndex) {
882    int[] newRegions = new int[regions.length + 1];
883    System.arraycopy(regions, 0, newRegions, 0, regions.length);
884    newRegions[newRegions.length - 1] = regionIndex;
885    return newRegions;
886  }
887
888  int[] addRegionSorted(int[] regions, int regionIndex) {
889    int[] newRegions = new int[regions.length + 1];
890    int i = 0;
891    for (i = 0; i < regions.length; i++) { // find the index to insert
892      if (regions[i] > regionIndex) {
893        break;
894      }
895    }
896    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
897    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
898    newRegions[i] = regionIndex;
899
900    return newRegions;
901  }
902
903  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
904    int i = 0;
905    for (i = 0; i < regions.length; i++) {
906      if (regions[i] == regionIndex) {
907        regions[i] = newRegionIndex;
908        break;
909      }
910    }
911    return regions;
912  }
913
914  void sortServersByRegionCount() {
915    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
916  }
917
918  int getNumRegions(int server) {
919    return regionsPerServer[server].length;
920  }
921
922  public Comparator<Integer> getNumRegionsComparator() {
923    return numRegionsComparator;
924  }
925
926  boolean contains(int[] arr, int val) {
927    return Arrays.binarySearch(arr, val) >= 0;
928  }
929
930  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
931
932  int getLowestLocalityRegionOnServer(int serverIndex) {
933    if (regionFinder != null) {
934      float lowestLocality = 1.0f;
935      int lowestLocalityRegionIndex = -1;
936      if (regionsPerServer[serverIndex].length == 0) {
937        // No regions on that region server
938        return -1;
939      }
940      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
941        int regionIndex = regionsPerServer[serverIndex][j];
942        HDFSBlocksDistribution distribution =
943          regionFinder.getBlockDistribution(regions[regionIndex]);
944        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
945        // skip empty region
946        if (distribution.getUniqueBlocksTotalWeight() == 0) {
947          continue;
948        }
949        if (locality < lowestLocality) {
950          lowestLocality = locality;
951          lowestLocalityRegionIndex = j;
952        }
953      }
954      if (lowestLocalityRegionIndex == -1) {
955        return -1;
956      }
957      if (LOG.isTraceEnabled()) {
958        LOG.trace("Lowest locality region is "
959          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
960            .getRegionNameAsString()
961          + " with locality " + lowestLocality + " and its region server contains "
962          + regionsPerServer[serverIndex].length + " regions");
963      }
964      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
965    } else {
966      return -1;
967    }
968  }
969
970  float getLocalityOfRegion(int region, int server) {
971    if (regionFinder != null) {
972      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
973      return distribution.getBlockLocalityIndex(servers[server].getHostname());
974    } else {
975      return 0f;
976    }
977  }
978
979  void setNumRegions(int numRegions) {
980    this.numRegions = numRegions;
981  }
982
983  void setNumMovedRegions(int numMovedRegions) {
984    this.numMovedRegions = numMovedRegions;
985  }
986
987  @Override
988  public String toString() {
989    StringBuilder desc = new StringBuilder("Cluster={servers=[");
990    for (ServerName sn : servers) {
991      desc.append(sn.getAddress().toString()).append(", ");
992    }
993    desc.append("], serverIndicesSortedByRegionCount=")
994      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
995      .append(Arrays.deepToString(regionsPerServer));
996
997    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
998      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
999      .append('}');
1000    return desc.toString();
1001  }
1002}