001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020import java.util.concurrent.atomic.AtomicLong;
021import org.agrona.collections.Hashing;
022import org.agrona.collections.Int2IntCounterMap;
023import org.apache.yetus.audience.InterfaceAudience;
024
025/**
026 * A cost function for region replicas. We give a high cost for hosting replicas of the same region
027 * in the same server, host or rack. We do not prevent the case though, since if numReplicas >
028 * numRegionServers, we still want to keep the replica open.
029 */
030@InterfaceAudience.Private
031abstract class RegionReplicaGroupingCostFunction extends CostFunction {
032  protected long maxCost = 0;
033  protected long[] costsPerGroup; // group is either server, host or rack
034
035  @Override
036  final void prepare(BalancerClusterState cluster) {
037    super.prepare(cluster);
038    if (!isNeeded()) {
039      return;
040    }
041    loadCosts();
042  }
043
044  protected abstract void loadCosts();
045
046  protected final long getMaxCost(BalancerClusterState cluster) {
047    // max cost is the case where every region replica is hosted together regardless of host
048    Int2IntCounterMap colocatedReplicaCounts =
049      new Int2IntCounterMap(cluster.numRegions, Hashing.DEFAULT_LOAD_FACTOR, 0);
050    for (int i = 0; i < cluster.regionIndexToPrimaryIndex.length; i++) {
051      colocatedReplicaCounts.getAndIncrement(cluster.regionIndexToPrimaryIndex[i]);
052    }
053    // compute numReplicas from the sorted array
054    return costPerGroup(colocatedReplicaCounts);
055  }
056
057  @Override
058  boolean isNeeded() {
059    return cluster.hasRegionReplicas;
060  }
061
062  @Override
063  protected double cost() {
064    if (maxCost <= 0) {
065      return 0;
066    }
067
068    long totalCost = 0;
069    for (int i = 0; i < costsPerGroup.length; i++) {
070      totalCost += costsPerGroup[i];
071    }
072    return scale(0, maxCost, totalCost);
073  }
074
075  /**
076   * For each primary region, it computes the total number of replicas in the array (numReplicas)
077   * and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
078   * d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
079   * + (3-1) * (3-1) + (1-1) * (1-1).
080   * @param colocatedReplicaCounts a sorted array of primary regions ids for the regions hosted
081   * @return a sum of numReplicas-1 squared for each primary region in the group.
082   */
083  protected final long costPerGroup(Int2IntCounterMap colocatedReplicaCounts) {
084    final AtomicLong cost = new AtomicLong(0);
085    // colocatedReplicaCounts is a sorted array of primary ids of regions. Replicas of regions
086    // sharing the same primary will have consecutive numbers in the array.
087    colocatedReplicaCounts.forEach((primary, count) -> {
088      if (count > 1) { // means consecutive primaries, indicating co-location
089        cost.getAndAdd((count - 1) * (count - 1));
090      }
091    });
092    return cost.longValue();
093  }
094
095  @Override
096  public final void updateWeight(double[] weights) {
097    weights[StochasticLoadBalancer.GeneratorType.RACK.ordinal()] += cost();
098  }
099}