001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.Deque;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import org.agrona.collections.Hashing;
030import org.agrona.collections.Int2IntCounterMap;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.client.RegionInfo;
034import org.apache.hadoop.hbase.client.RegionReplicaUtil;
035import org.apache.hadoop.hbase.master.RackManager;
036import org.apache.hadoop.hbase.net.Address;
037import org.apache.hadoop.hbase.util.Pair;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * An efficient array based implementation similar to ClusterState for keeping the status of the
044 * cluster in terms of region assignment and distribution. LoadBalancers, such as
045 * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap
046 * manipulations are very costly, which is why this class uses mostly indexes and arrays.
047 * <p/>
048 * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server
049 * topology in terms of server names, hostnames and racks.
050 */
051@InterfaceAudience.Private
052class BalancerClusterState {
053
054  private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class);
055
056  ServerName[] servers;
057  // ServerName uniquely identifies a region server. multiple RS can run on the same host
058  String[] hosts;
059  String[] racks;
060  boolean multiServersPerHost = false; // whether or not any host has more than one server
061
062  ArrayList<String> tables;
063  RegionInfo[] regions;
064  Deque<BalancerRegionLoad>[] regionLoads;
065  private RegionHDFSBlockLocationFinder regionFinder;
066
067  int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality
068
069  int[] serverIndexToHostIndex; // serverIndex -> host index
070  int[] serverIndexToRackIndex; // serverIndex -> rack index
071
072  int[][] regionsPerServer; // serverIndex -> region list
073  int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list
074  int[][] regionsPerHost; // hostIndex -> list of regions
075  int[][] regionsPerRack; // rackIndex -> region list
076  Int2IntCounterMap[] colocatedReplicaCountsPerServer; // serverIndex -> counts of colocated
077  // replicas by primary region index
078  Int2IntCounterMap[] colocatedReplicaCountsPerHost; // hostIndex -> counts of colocated replicas by
079  // primary region index
080  Int2IntCounterMap[] colocatedReplicaCountsPerRack; // rackIndex -> counts of colocated replicas by
081  // primary region index
082
083  int[][] serversPerHost; // hostIndex -> list of server indexes
084  int[][] serversPerRack; // rackIndex -> list of server indexes
085  int[] regionIndexToServerIndex; // regionIndex -> serverIndex
086  int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state)
087  int[] regionIndexToTableIndex; // regionIndex -> tableIndex
088  int[][] numRegionsPerServerPerTable; // tableIndex -> serverIndex -> tableIndex -> # regions
089  int[] numRegionsPerTable; // tableIndex -> region count
090  double[] meanRegionsPerTable; // mean region count per table
091  int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary
092  boolean hasRegionReplicas = false; // whether there is regions with replicas
093
094  Integer[] serverIndicesSortedByRegionCount;
095  Integer[] serverIndicesSortedByLocality;
096
097  Map<Address, Integer> serversToIndex;
098  Map<String, Integer> hostsToIndex;
099  Map<String, Integer> racksToIndex;
100  Map<String, Integer> tablesToIndex;
101  Map<RegionInfo, Integer> regionsToIndex;
102  float[] localityPerServer;
103
104  int numServers;
105  int numHosts;
106  int numRacks;
107  int numTables;
108  int numRegions;
109  int maxReplicas = 1;
110
111  int numMovedRegions = 0; // num moved regions from the initial configuration
112  Map<ServerName, List<RegionInfo>> clusterState;
113
114  private final RackManager rackManager;
115  // Maps region -> rackIndex -> locality of region on rack
116  private float[][] rackLocalities;
117  // Maps localityType -> region -> [server|rack]Index with highest locality
118  private int[][] regionsToMostLocalEntities;
119  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
120  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
121  // Maps regionIndex -> serverIndex with best region cache ratio
122  private int[] regionServerIndexWithBestRegionCachedRatio;
123  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
124  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
125
126  static class DefaultRackManager extends RackManager {
127    @Override
128    public String getRack(ServerName server) {
129      return UNKNOWN_RACK;
130    }
131  }
132
133  BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
134    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
135    RackManager rackManager) {
136    this(null, clusterState, loads, regionFinder, rackManager, null);
137  }
138
139  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
140    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
141    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
142    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
143  }
144
145  @SuppressWarnings("unchecked")
146  BalancerClusterState(Collection<RegionInfo> unassignedRegions,
147    Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
148    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
149    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
150    if (unassignedRegions == null) {
151      unassignedRegions = Collections.emptyList();
152    }
153
154    serversToIndex = new HashMap<>();
155    hostsToIndex = new HashMap<>();
156    racksToIndex = new HashMap<>();
157    tablesToIndex = new HashMap<>();
158
159    // TODO: We should get the list of tables from master
160    tables = new ArrayList<>();
161    this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
162
163    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
164
165    numRegions = 0;
166
167    List<List<Integer>> serversPerHostList = new ArrayList<>();
168    List<List<Integer>> serversPerRackList = new ArrayList<>();
169    this.clusterState = clusterState;
170    this.regionFinder = regionFinder;
171
172    // Use servername and port as there can be dead servers in this list. We want everything with
173    // a matching hostname and port to have the same index.
174    for (ServerName sn : clusterState.keySet()) {
175      if (sn == null) {
176        LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); "
177          + "skipping; unassigned regions?");
178        if (LOG.isTraceEnabled()) {
179          LOG.trace("EMPTY SERVERNAME " + clusterState.toString());
180        }
181        continue;
182      }
183      if (serversToIndex.get(sn.getAddress()) == null) {
184        serversToIndex.put(sn.getAddress(), numServers++);
185      }
186      if (!hostsToIndex.containsKey(sn.getHostname())) {
187        hostsToIndex.put(sn.getHostname(), numHosts++);
188        serversPerHostList.add(new ArrayList<>(1));
189      }
190
191      int serverIndex = serversToIndex.get(sn.getAddress());
192      int hostIndex = hostsToIndex.get(sn.getHostname());
193      serversPerHostList.get(hostIndex).add(serverIndex);
194
195      String rack = this.rackManager.getRack(sn);
196
197      if (!racksToIndex.containsKey(rack)) {
198        racksToIndex.put(rack, numRacks++);
199        serversPerRackList.add(new ArrayList<>());
200      }
201      int rackIndex = racksToIndex.get(rack);
202      serversPerRackList.get(rackIndex).add(serverIndex);
203    }
204
205    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
206    // Count how many regions there are.
207    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
208      numRegions += entry.getValue().size();
209    }
210    numRegions += unassignedRegions.size();
211
212    regionsToIndex = new HashMap<>(numRegions);
213    servers = new ServerName[numServers];
214    serversPerHost = new int[numHosts][];
215    serversPerRack = new int[numRacks][];
216    regions = new RegionInfo[numRegions];
217    regionIndexToServerIndex = new int[numRegions];
218    initialRegionIndexToServerIndex = new int[numRegions];
219    regionIndexToTableIndex = new int[numRegions];
220    regionIndexToPrimaryIndex = new int[numRegions];
221    regionLoads = new Deque[numRegions];
222
223    regionLocations = new int[numRegions][];
224    serverIndicesSortedByRegionCount = new Integer[numServers];
225    serverIndicesSortedByLocality = new Integer[numServers];
226    localityPerServer = new float[numServers];
227
228    serverIndexToHostIndex = new int[numServers];
229    serverIndexToRackIndex = new int[numServers];
230    regionsPerServer = new int[numServers][];
231    serverIndexToRegionsOffset = new int[numServers];
232    regionsPerHost = new int[numHosts][];
233    regionsPerRack = new int[numRacks][];
234    colocatedReplicaCountsPerServer = new Int2IntCounterMap[numServers];
235    colocatedReplicaCountsPerHost = new Int2IntCounterMap[numHosts];
236    colocatedReplicaCountsPerRack = new Int2IntCounterMap[numRacks];
237
238    int regionIndex = 0, regionPerServerIndex = 0;
239
240    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
241      if (entry.getKey() == null) {
242        LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue());
243        continue;
244      }
245      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
246
247      // keep the servername if this is the first server name for this hostname
248      // or this servername has the newest startcode.
249      if (
250        servers[serverIndex] == null
251          || servers[serverIndex].getStartcode() < entry.getKey().getStartcode()
252      ) {
253        servers[serverIndex] = entry.getKey();
254      }
255
256      if (regionsPerServer[serverIndex] != null) {
257        // there is another server with the same hostAndPort in ClusterState.
258        // allocate the array for the total size
259        regionsPerServer[serverIndex] =
260          new int[entry.getValue().size() + regionsPerServer[serverIndex].length];
261      } else {
262        regionsPerServer[serverIndex] = new int[entry.getValue().size()];
263      }
264      colocatedReplicaCountsPerServer[serverIndex] =
265        new Int2IntCounterMap(regionsPerServer[serverIndex].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
266      serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
267      serverIndicesSortedByLocality[serverIndex] = serverIndex;
268    }
269
270    hosts = new String[numHosts];
271    for (Map.Entry<String, Integer> entry : hostsToIndex.entrySet()) {
272      hosts[entry.getValue()] = entry.getKey();
273    }
274    racks = new String[numRacks];
275    for (Map.Entry<String, Integer> entry : racksToIndex.entrySet()) {
276      racks[entry.getValue()] = entry.getKey();
277    }
278
279    for (Map.Entry<ServerName, List<RegionInfo>> entry : clusterState.entrySet()) {
280      int serverIndex = serversToIndex.get(entry.getKey().getAddress());
281      regionPerServerIndex = serverIndexToRegionsOffset[serverIndex];
282
283      int hostIndex = hostsToIndex.get(entry.getKey().getHostname());
284      serverIndexToHostIndex[serverIndex] = hostIndex;
285
286      int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey()));
287      serverIndexToRackIndex[serverIndex] = rackIndex;
288
289      for (RegionInfo region : entry.getValue()) {
290        registerRegion(region, regionIndex, serverIndex, loads, regionFinder);
291        regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
292        regionIndex++;
293      }
294      serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex;
295    }
296
297    for (RegionInfo region : unassignedRegions) {
298      registerRegion(region, regionIndex, -1, loads, regionFinder);
299      regionIndex++;
300    }
301
302    if (LOG.isDebugEnabled()) {
303      for (int i = 0; i < numServers; i++) {
304        LOG.debug("server {} has {} regions", i, regionsPerServer[i].length);
305      }
306    }
307    for (int i = 0; i < serversPerHostList.size(); i++) {
308      serversPerHost[i] = new int[serversPerHostList.get(i).size()];
309      for (int j = 0; j < serversPerHost[i].length; j++) {
310        serversPerHost[i][j] = serversPerHostList.get(i).get(j);
311        LOG.debug("server {} is on host {}", serversPerHostList.get(i).get(j), i);
312      }
313      if (serversPerHost[i].length > 1) {
314        multiServersPerHost = true;
315      }
316    }
317
318    for (int i = 0; i < serversPerRackList.size(); i++) {
319      serversPerRack[i] = new int[serversPerRackList.get(i).size()];
320      for (int j = 0; j < serversPerRack[i].length; j++) {
321        serversPerRack[i][j] = serversPerRackList.get(i).get(j);
322        LOG.info("server {} is on rack {}", serversPerRackList.get(i).get(j), i);
323      }
324    }
325
326    numTables = tables.size();
327    LOG.debug("Number of tables={}, number of hosts={}, number of racks={}", numTables, numHosts,
328      numRacks);
329    numRegionsPerServerPerTable = new int[numTables][numServers];
330    numRegionsPerTable = new int[numTables];
331
332    for (int i = 0; i < numTables; i++) {
333      for (int j = 0; j < numServers; j++) {
334        numRegionsPerServerPerTable[i][j] = 0;
335      }
336    }
337
338    for (int i = 0; i < regionIndexToServerIndex.length; i++) {
339      if (regionIndexToServerIndex[i] >= 0) {
340        numRegionsPerServerPerTable[regionIndexToTableIndex[i]][regionIndexToServerIndex[i]]++;
341        numRegionsPerTable[regionIndexToTableIndex[i]]++;
342      }
343    }
344
345    // Avoid repeated computation for planning
346    meanRegionsPerTable = new double[numTables];
347
348    for (int i = 0; i < numTables; i++) {
349      meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers;
350    }
351
352    for (int i = 0; i < regions.length; i++) {
353      RegionInfo info = regions[i];
354      if (RegionReplicaUtil.isDefaultReplica(info)) {
355        regionIndexToPrimaryIndex[i] = i;
356      } else {
357        hasRegionReplicas = true;
358        RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info);
359        regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1);
360      }
361    }
362
363    for (int i = 0; i < regionsPerServer.length; i++) {
364      colocatedReplicaCountsPerServer[i] =
365        new Int2IntCounterMap(regionsPerServer[i].length, Hashing.DEFAULT_LOAD_FACTOR, 0);
366      for (int j = 0; j < regionsPerServer[i].length; j++) {
367        int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]];
368        colocatedReplicaCountsPerServer[i].getAndIncrement(primaryIndex);
369      }
370    }
371    // compute regionsPerHost
372    if (multiServersPerHost) {
373      populateRegionPerLocationFromServer(regionsPerHost, colocatedReplicaCountsPerHost,
374        serversPerHost);
375    }
376
377    // compute regionsPerRack
378    if (numRacks > 1) {
379      populateRegionPerLocationFromServer(regionsPerRack, colocatedReplicaCountsPerRack,
380        serversPerRack);
381    }
382  }
383
384  private void populateRegionPerLocationFromServer(int[][] regionsPerLocation,
385    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int[][] serversPerLocation) {
386    for (int i = 0; i < serversPerLocation.length; i++) {
387      int numRegionsPerLocation = 0;
388      for (int j = 0; j < serversPerLocation[i].length; j++) {
389        numRegionsPerLocation += regionsPerServer[serversPerLocation[i][j]].length;
390      }
391      regionsPerLocation[i] = new int[numRegionsPerLocation];
392      colocatedReplicaCountsPerLocation[i] =
393        new Int2IntCounterMap(numRegionsPerLocation, Hashing.DEFAULT_LOAD_FACTOR, 0);
394    }
395
396    for (int i = 0; i < serversPerLocation.length; i++) {
397      int numRegionPerLocationIndex = 0;
398      for (int j = 0; j < serversPerLocation[i].length; j++) {
399        for (int k = 0; k < regionsPerServer[serversPerLocation[i][j]].length; k++) {
400          int region = regionsPerServer[serversPerLocation[i][j]][k];
401          regionsPerLocation[i][numRegionPerLocationIndex] = region;
402          int primaryIndex = regionIndexToPrimaryIndex[region];
403          colocatedReplicaCountsPerLocation[i].getAndIncrement(primaryIndex);
404          numRegionPerLocationIndex++;
405        }
406      }
407    }
408
409  }
410
411  /** Helper for Cluster constructor to handle a region */
412  private void registerRegion(RegionInfo region, int regionIndex, int serverIndex,
413    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder) {
414    String tableName = region.getTable().getNameAsString();
415    if (!tablesToIndex.containsKey(tableName)) {
416      tables.add(tableName);
417      tablesToIndex.put(tableName, tablesToIndex.size());
418    }
419    int tableIndex = tablesToIndex.get(tableName);
420
421    regionsToIndex.put(region, regionIndex);
422    regions[regionIndex] = region;
423    regionIndexToServerIndex[regionIndex] = serverIndex;
424    initialRegionIndexToServerIndex[regionIndex] = serverIndex;
425    regionIndexToTableIndex[regionIndex] = tableIndex;
426
427    // region load
428    if (loads != null) {
429      Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
430      // That could have failed if the RegionLoad is using the other regionName
431      if (rl == null) {
432        // Try getting the region load using encoded name.
433        rl = loads.get(region.getEncodedName());
434      }
435      regionLoads[regionIndex] = rl;
436    }
437
438    if (regionFinder != null) {
439      // region location
440      List<ServerName> loc = regionFinder.getTopBlockLocations(region);
441      regionLocations[regionIndex] = new int[loc.size()];
442      for (int i = 0; i < loc.size(); i++) {
443        regionLocations[regionIndex][i] = loc.get(i) == null
444          ? -1
445          : (serversToIndex.get(loc.get(i).getAddress()) == null
446            ? -1
447            : serversToIndex.get(loc.get(i).getAddress()));
448      }
449    }
450
451    int numReplicas = region.getReplicaId() + 1;
452    if (numReplicas > maxReplicas) {
453      maxReplicas = numReplicas;
454    }
455  }
456
457  /**
458   * Returns true iff a given server has less regions than the balanced amount
459   */
460  public boolean serverHasTooFewRegions(int server) {
461    int minLoad = this.numRegions / numServers;
462    int numRegions = getNumRegions(server);
463    return numRegions < minLoad;
464  }
465
466  /**
467   * Retrieves and lazily initializes a field storing the locality of every region/server
468   * combination
469   */
470  public float[][] getOrComputeRackLocalities() {
471    if (rackLocalities == null || regionsToMostLocalEntities == null) {
472      computeCachedLocalities();
473    }
474    return rackLocalities;
475  }
476
477  /**
478   * Lazily initializes and retrieves a mapping of region -> server for which region has the highest
479   * the locality
480   */
481  public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) {
482    if (rackLocalities == null || regionsToMostLocalEntities == null) {
483      computeCachedLocalities();
484    }
485    return regionsToMostLocalEntities[type.ordinal()];
486  }
487
488  /**
489   * Looks up locality from cache of localities. Will create cache if it does not already exist.
490   */
491  public float getOrComputeLocality(int region, int entity,
492    BalancerClusterState.LocalityType type) {
493    switch (type) {
494      case SERVER:
495        return getLocalityOfRegion(region, entity);
496      case RACK:
497        return getOrComputeRackLocalities()[region][entity];
498      default:
499        throw new IllegalArgumentException("Unsupported LocalityType: " + type);
500    }
501  }
502
503  /**
504   * Returns locality weighted by region size in MB. Will create locality cache if it does not
505   * already exist.
506   */
507  public double getOrComputeWeightedLocality(int region, int server,
508    BalancerClusterState.LocalityType type) {
509    return getRegionSizeMB(region) * getOrComputeLocality(region, server, type);
510  }
511
512  /**
513   * Returns the size in MB from the most recent RegionLoad for region
514   */
515  public int getRegionSizeMB(int region) {
516    Deque<BalancerRegionLoad> load = regionLoads[region];
517    // This means regions have no actual data on disk
518    if (load == null) {
519      return 0;
520    }
521    return regionLoads[region].getLast().getStorefileSizeMB();
522  }
523
524  /**
525   * Computes and caches the locality for each region/rack combinations, as well as storing a
526   * mapping of region -> server and region -> rack such that server and rack have the highest
527   * locality for region
528   */
529  private void computeCachedLocalities() {
530    rackLocalities = new float[numRegions][numRacks];
531    regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions];
532
533    // Compute localities and find most local server per region
534    for (int region = 0; region < numRegions; region++) {
535      int serverWithBestLocality = 0;
536      float bestLocalityForRegion = 0;
537      for (int server = 0; server < numServers; server++) {
538        // Aggregate per-rack locality
539        float locality = getLocalityOfRegion(region, server);
540        int rack = serverIndexToRackIndex[server];
541        int numServersInRack = serversPerRack[rack].length;
542        rackLocalities[region][rack] += locality / numServersInRack;
543
544        if (locality > bestLocalityForRegion) {
545          serverWithBestLocality = server;
546          bestLocalityForRegion = locality;
547        }
548      }
549      regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality;
550
551      // Find most local rack per region
552      int rackWithBestLocality = 0;
553      float bestRackLocalityForRegion = 0.0f;
554      for (int rack = 0; rack < numRacks; rack++) {
555        float rackLocality = rackLocalities[region][rack];
556        if (rackLocality > bestRackLocalityForRegion) {
557          bestRackLocalityForRegion = rackLocality;
558          rackWithBestLocality = rack;
559        }
560      }
561      regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality;
562    }
563
564  }
565
566  /**
567   * Returns the size of hFiles from the most recent RegionLoad for region
568   */
569  public int getTotalRegionHFileSizeMB(int region) {
570    Deque<BalancerRegionLoad> load = regionLoads[region];
571    if (load == null) {
572      // This means, that the region has no actual data on disk
573      return 0;
574    }
575    return regionLoads[region].getLast().getRegionSizeMB();
576  }
577
578  /**
579   * Returns the weighted cache ratio of a region on the given region server
580   */
581  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
582    return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
583  }
584
585  /**
586   * Returns the amount by which a region is cached on a given region server. If the region is not
587   * currently hosted on the given region server, then find out if it was previously hosted there
588   * and return the old cache ratio.
589   */
590  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
591    float regionCacheRatio = 0.0f;
592
593    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
594    for (int regionIndex : regionsPerServer[regionServerIndex]) {
595      if (region != regionIndex) {
596        continue;
597      }
598
599      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
600
601      // The region is currently hosted on this region server. Get the region cache ratio for this
602      // region on this server
603      regionCacheRatio =
604        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
605
606      return regionCacheRatio;
607    }
608
609    // Region is not currently hosted on this server. Check if the region was cached on this
610    // server earlier. This can happen when the server was shutdown and the cache was persisted.
611    // Search using the region name and server name and not the index id and server id as these ids
612    // may change when a server is marked as dead or a new server is added.
613    String regionEncodedName = regions[region].getEncodedName();
614    ServerName serverName = servers[regionServerIndex];
615    if (
616      regionCacheRatioOnOldServerMap != null
617        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
618    ) {
619      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
620        regionCacheRatioOnOldServerMap.get(regionEncodedName);
621      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
622        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
623        if (LOG.isDebugEnabled()) {
624          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
625            serverName, regionCacheRatio);
626        }
627      }
628    }
629    return regionCacheRatio;
630  }
631
632  /**
633   * Populate the maps containing information about how much a region is cached on a region server.
634   */
635  private void computeRegionServerRegionCacheRatio() {
636    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
637    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
638
639    for (int region = 0; region < numRegions; region++) {
640      float bestRegionCacheRatio = 0.0f;
641      int serverWithBestRegionCacheRatio = 0;
642      for (int server = 0; server < numServers; server++) {
643        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
644        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
645          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
646          // cache ratio only if the cache ratio is greater than 0.
647          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
648          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
649        }
650        if (regionCacheRatio > bestRegionCacheRatio) {
651          serverWithBestRegionCacheRatio = server;
652          // If the server currently hosting the region has equal cache ratio to a historical
653          // server, consider the current server to keep hosting the region
654          bestRegionCacheRatio = regionCacheRatio;
655        } else if (
656          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
657        ) {
658          // If two servers have same region cache ratio, then the server currently hosting the
659          // region
660          // should retain the region
661          serverWithBestRegionCacheRatio = server;
662        }
663      }
664      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
665      Pair<Integer, Integer> regionServerPair =
666        new Pair<>(region, regionIndexToServerIndex[region]);
667      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
668      if (tempRegionCacheRatio > bestRegionCacheRatio) {
669        LOG.warn(
670          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
671            + "best region cache ratio {} on server {}",
672          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
673          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
674      }
675    }
676  }
677
678  protected float getOrComputeRegionCacheRatio(int region, int server) {
679    if (
680      regionServerIndexWithBestRegionCachedRatio == null
681        || regionIndexServerIndexRegionCachedRatio.isEmpty()
682    ) {
683      computeRegionServerRegionCacheRatio();
684    }
685
686    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
687    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
688      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
689      : 0.0f;
690  }
691
692  public int[] getOrComputeServerWithBestRegionCachedRatio() {
693    if (
694      regionServerIndexWithBestRegionCachedRatio == null
695        || regionIndexServerIndexRegionCachedRatio.isEmpty()
696    ) {
697      computeRegionServerRegionCacheRatio();
698    }
699    return regionServerIndexWithBestRegionCachedRatio;
700  }
701
702  /**
703   * Maps region index to rack index
704   */
705  public int getRackForRegion(int region) {
706    return serverIndexToRackIndex[regionIndexToServerIndex[region]];
707  }
708
709  enum LocalityType {
710    SERVER,
711    RACK
712  }
713
714  public void doAction(BalanceAction action) {
715    switch (action.getType()) {
716      case NULL:
717        break;
718      case ASSIGN_REGION:
719        // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
720        assert action instanceof AssignRegionAction : action.getClass();
721        AssignRegionAction ar = (AssignRegionAction) action;
722        regionsPerServer[ar.getServer()] =
723          addRegion(regionsPerServer[ar.getServer()], ar.getRegion());
724        regionMoved(ar.getRegion(), -1, ar.getServer());
725        break;
726      case MOVE_REGION:
727        assert action instanceof MoveRegionAction : action.getClass();
728        MoveRegionAction mra = (MoveRegionAction) action;
729        regionsPerServer[mra.getFromServer()] =
730          removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion());
731        regionsPerServer[mra.getToServer()] =
732          addRegion(regionsPerServer[mra.getToServer()], mra.getRegion());
733        regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer());
734        break;
735      case SWAP_REGIONS:
736        assert action instanceof SwapRegionsAction : action.getClass();
737        SwapRegionsAction a = (SwapRegionsAction) action;
738        regionsPerServer[a.getFromServer()] =
739          replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion());
740        regionsPerServer[a.getToServer()] =
741          replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion());
742        regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
743        regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
744        break;
745      default:
746        throw new RuntimeException("Uknown action:" + action.getType());
747    }
748  }
749
750  /**
751   * Return true if the placement of region on server would lower the availability of the region in
752   * question
753   * @return true or false
754   */
755  boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
756    if (!serversToIndex.containsKey(serverName.getAddress())) {
757      return false; // safeguard against race between cluster.servers and servers from LB method
758      // args
759    }
760    int server = serversToIndex.get(serverName.getAddress());
761    int region = regionsToIndex.get(regionInfo);
762
763    // Region replicas for same region should better assign to different servers
764    for (int i : regionsPerServer[server]) {
765      RegionInfo otherRegionInfo = regions[i];
766      if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
767        return true;
768      }
769    }
770
771    int primary = regionIndexToPrimaryIndex[region];
772    if (primary == -1) {
773      return false;
774    }
775    // there is a subset relation for server < host < rack
776    // check server first
777    int result = checkLocationForPrimary(server, colocatedReplicaCountsPerServer, primary);
778    if (result != 0) {
779      return result > 0;
780    }
781
782    // check host
783    if (multiServersPerHost) {
784      result = checkLocationForPrimary(serverIndexToHostIndex[server],
785        colocatedReplicaCountsPerHost, primary);
786      if (result != 0) {
787        return result > 0;
788      }
789    }
790
791    // check rack
792    if (numRacks > 1) {
793      result = checkLocationForPrimary(serverIndexToRackIndex[server],
794        colocatedReplicaCountsPerRack, primary);
795      if (result != 0) {
796        return result > 0;
797      }
798    }
799    return false;
800  }
801
802  /**
803   * Common method for better solution check.
804   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
805   *                                          colocatedReplicaCountsPerRack
806   * @return 1 for better, -1 for no better, 0 for unknown
807   */
808  private int checkLocationForPrimary(int location,
809    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int primary) {
810    if (colocatedReplicaCountsPerLocation[location].containsKey(primary)) {
811      // check for whether there are other Locations that we can place this region
812      for (int i = 0; i < colocatedReplicaCountsPerLocation.length; i++) {
813        if (i != location && !colocatedReplicaCountsPerLocation[i].containsKey(primary)) {
814          return 1; // meaning there is a better Location
815        }
816      }
817      return -1; // there is not a better Location to place this
818    }
819    return 0;
820  }
821
822  void doAssignRegion(RegionInfo regionInfo, ServerName serverName) {
823    if (!serversToIndex.containsKey(serverName.getAddress())) {
824      return;
825    }
826    int server = serversToIndex.get(serverName.getAddress());
827    int region = regionsToIndex.get(regionInfo);
828    doAction(new AssignRegionAction(region, server));
829  }
830
831  void regionMoved(int region, int oldServer, int newServer) {
832    regionIndexToServerIndex[region] = newServer;
833    if (initialRegionIndexToServerIndex[region] == newServer) {
834      numMovedRegions--; // region moved back to original location
835    } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) {
836      numMovedRegions++; // region moved from original location
837    }
838    int tableIndex = regionIndexToTableIndex[region];
839    if (oldServer >= 0) {
840      numRegionsPerServerPerTable[tableIndex][oldServer]--;
841    }
842    numRegionsPerServerPerTable[tableIndex][newServer]++;
843
844    // update for servers
845    int primary = regionIndexToPrimaryIndex[region];
846    if (oldServer >= 0) {
847      colocatedReplicaCountsPerServer[oldServer].getAndDecrement(primary);
848    }
849    colocatedReplicaCountsPerServer[newServer].getAndIncrement(primary);
850
851    // update for hosts
852    if (multiServersPerHost) {
853      updateForLocation(serverIndexToHostIndex, regionsPerHost, colocatedReplicaCountsPerHost,
854        oldServer, newServer, primary, region);
855    }
856
857    // update for racks
858    if (numRacks > 1) {
859      updateForLocation(serverIndexToRackIndex, regionsPerRack, colocatedReplicaCountsPerRack,
860        oldServer, newServer, primary, region);
861    }
862  }
863
864  /**
865   * Common method for per host and per Location region index updates when a region is moved.
866   * @param serverIndexToLocation             serverIndexToHostIndex or serverIndexToLocationIndex
867   * @param regionsPerLocation                regionsPerHost or regionsPerLocation
868   * @param colocatedReplicaCountsPerLocation colocatedReplicaCountsPerHost or
869   *                                          colocatedReplicaCountsPerRack
870   */
871  private void updateForLocation(int[] serverIndexToLocation, int[][] regionsPerLocation,
872    Int2IntCounterMap[] colocatedReplicaCountsPerLocation, int oldServer, int newServer,
873    int primary, int region) {
874    int oldLocation = oldServer >= 0 ? serverIndexToLocation[oldServer] : -1;
875    int newLocation = serverIndexToLocation[newServer];
876    if (newLocation != oldLocation) {
877      regionsPerLocation[newLocation] = addRegion(regionsPerLocation[newLocation], region);
878      colocatedReplicaCountsPerLocation[newLocation].getAndIncrement(primary);
879      if (oldLocation >= 0) {
880        regionsPerLocation[oldLocation] = removeRegion(regionsPerLocation[oldLocation], region);
881        colocatedReplicaCountsPerLocation[oldLocation].getAndDecrement(primary);
882      }
883    }
884
885  }
886
887  int[] removeRegion(int[] regions, int regionIndex) {
888    // TODO: this maybe costly. Consider using linked lists
889    int[] newRegions = new int[regions.length - 1];
890    int i = 0;
891    for (i = 0; i < regions.length; i++) {
892      if (regions[i] == regionIndex) {
893        break;
894      }
895      newRegions[i] = regions[i];
896    }
897    System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i);
898    return newRegions;
899  }
900
901  int[] addRegion(int[] regions, int regionIndex) {
902    int[] newRegions = new int[regions.length + 1];
903    System.arraycopy(regions, 0, newRegions, 0, regions.length);
904    newRegions[newRegions.length - 1] = regionIndex;
905    return newRegions;
906  }
907
908  int[] addRegionSorted(int[] regions, int regionIndex) {
909    int[] newRegions = new int[regions.length + 1];
910    int i = 0;
911    for (i = 0; i < regions.length; i++) { // find the index to insert
912      if (regions[i] > regionIndex) {
913        break;
914      }
915    }
916    System.arraycopy(regions, 0, newRegions, 0, i); // copy first half
917    System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half
918    newRegions[i] = regionIndex;
919
920    return newRegions;
921  }
922
923  int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) {
924    int i = 0;
925    for (i = 0; i < regions.length; i++) {
926      if (regions[i] == regionIndex) {
927        regions[i] = newRegionIndex;
928        break;
929      }
930    }
931    return regions;
932  }
933
934  void sortServersByRegionCount() {
935    Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
936  }
937
938  int getNumRegions(int server) {
939    return regionsPerServer[server].length;
940  }
941
942  boolean contains(int[] arr, int val) {
943    return Arrays.binarySearch(arr, val) >= 0;
944  }
945
946  private Comparator<Integer> numRegionsComparator = Comparator.comparingInt(this::getNumRegions);
947
948  public Comparator<Integer> getNumRegionsComparator() {
949    return numRegionsComparator;
950  }
951
952  int getLowestLocalityRegionOnServer(int serverIndex) {
953    if (regionFinder != null) {
954      float lowestLocality = 1.0f;
955      int lowestLocalityRegionIndex = -1;
956      if (regionsPerServer[serverIndex].length == 0) {
957        // No regions on that region server
958        return -1;
959      }
960      for (int j = 0; j < regionsPerServer[serverIndex].length; j++) {
961        int regionIndex = regionsPerServer[serverIndex][j];
962        HDFSBlocksDistribution distribution =
963          regionFinder.getBlockDistribution(regions[regionIndex]);
964        float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname());
965        // skip empty region
966        if (distribution.getUniqueBlocksTotalWeight() == 0) {
967          continue;
968        }
969        if (locality < lowestLocality) {
970          lowestLocality = locality;
971          lowestLocalityRegionIndex = j;
972        }
973      }
974      if (lowestLocalityRegionIndex == -1) {
975        return -1;
976      }
977      if (LOG.isTraceEnabled()) {
978        LOG.trace("Lowest locality region is "
979          + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]]
980            .getRegionNameAsString()
981          + " with locality " + lowestLocality + " and its region server contains "
982          + regionsPerServer[serverIndex].length + " regions");
983      }
984      return regionsPerServer[serverIndex][lowestLocalityRegionIndex];
985    } else {
986      return -1;
987    }
988  }
989
990  float getLocalityOfRegion(int region, int server) {
991    if (regionFinder != null) {
992      HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]);
993      return distribution.getBlockLocalityIndex(servers[server].getHostname());
994    } else {
995      return 0f;
996    }
997  }
998
999  void setNumRegions(int numRegions) {
1000    this.numRegions = numRegions;
1001  }
1002
1003  void setNumMovedRegions(int numMovedRegions) {
1004    this.numMovedRegions = numMovedRegions;
1005  }
1006
1007  @Override
1008  public String toString() {
1009    StringBuilder desc = new StringBuilder("Cluster={servers=[");
1010    for (ServerName sn : servers) {
1011      desc.append(sn.getAddress().toString()).append(", ");
1012    }
1013    desc.append("], serverIndicesSortedByRegionCount=")
1014      .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=")
1015      .append(Arrays.deepToString(regionsPerServer));
1016
1017    desc.append(", numRegions=").append(numRegions).append(", numServers=").append(numServers)
1018      .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions)
1019      .append('}');
1020    return desc.toString();
1021  }
1022}