001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.balancer; 019 020import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MAX_RUNNING_TIME_KEY; 021import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY; 022 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.Base64; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.function.Function; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionInfo; 037import org.apache.hadoop.hbase.master.RegionPlan; 038import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042public final class CandidateGeneratorTestUtil { 043 044 private static final Logger LOG = LoggerFactory.getLogger(CandidateGeneratorTestUtil.class); 045 046 private CandidateGeneratorTestUtil() { 047 } 048 049 enum ExhaustionType { 050 COST_GOAL_ACHIEVED, 051 NO_MORE_MOVES; 052 } 053 054 static void runBalancerToExhaustion(Configuration conf, 055 Map<ServerName, List<RegionInfo>> serverToRegions, 056 Set<Function<BalancerClusterState, Boolean>> expectations, float targetMaxBalancerCost) { 057 runBalancerToExhaustion(conf, serverToRegions, expectations, targetMaxBalancerCost, 15000, 058 ExhaustionType.COST_GOAL_ACHIEVED); 059 } 060 061 static void runBalancerToExhaustion(Configuration conf, 062 Map<ServerName, List<RegionInfo>> serverToRegions, 063 Set<Function<BalancerClusterState, Boolean>> expectations, float targetMaxBalancerCost, 064 long maxRunningTime, ExhaustionType exhaustionType) { 065 // Do the full plan. We're testing with a lot of regions 066 conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); 067 conf.setLong(MAX_RUNNING_TIME_KEY, maxRunningTime); 068 069 conf.setFloat(MIN_COST_NEED_BALANCE_KEY, targetMaxBalancerCost); 070 071 BalancerClusterState cluster = createMockBalancerClusterState(serverToRegions); 072 StochasticLoadBalancer stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf); 073 printClusterDistribution(cluster, 0); 074 int balancerRuns = 0; 075 int actionsTaken = 0; 076 long balancingMillis = 0; 077 boolean isBalanced = false; 078 while (!isBalanced) { 079 balancerRuns++; 080 if (balancerRuns > 10) { 081 throw new RuntimeException("Balancer failed to find balance & meet expectations"); 082 } 083 long start = System.currentTimeMillis(); 084 List<RegionPlan> regionPlans = 085 stochasticLoadBalancer.balanceCluster(partitionRegionsByTable(serverToRegions)); 086 balancingMillis += System.currentTimeMillis() - start; 087 actionsTaken++; 088 if (regionPlans != null) { 089 // Apply all plans to serverToRegions 090 for (RegionPlan rp : regionPlans) { 091 ServerName source = rp.getSource(); 092 ServerName dest = rp.getDestination(); 093 RegionInfo region = rp.getRegionInfo(); 094 095 // Update serverToRegions 096 serverToRegions.get(source).remove(region); 097 serverToRegions.get(dest).add(region); 098 actionsTaken++; 099 } 100 101 // Now rebuild cluster and balancer from updated serverToRegions 102 cluster = createMockBalancerClusterState(serverToRegions); 103 stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf); 104 } 105 printClusterDistribution(cluster, actionsTaken); 106 isBalanced = true; 107 for (Function<BalancerClusterState, Boolean> condition : expectations) { 108 // Check if we've met all expectations for the candidate generator 109 if (!condition.apply(cluster)) { 110 isBalanced = false; 111 break; 112 } 113 } 114 if (isBalanced) { // Check if the balancer thinks we're done too 115 if (exhaustionType == ExhaustionType.COST_GOAL_ACHIEVED) { 116 // If we expect to achieve the cost goal, then needsBalance should be false 117 if (stochasticLoadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, cluster)) { 118 LOG.info("Balancer cost goal is not achieved. needsBalance=true"); 119 isBalanced = false; 120 } 121 } else { 122 // If we anticipate running out of moves, then our last balance run should have produced 123 // nothing 124 if (regionPlans != null && !regionPlans.isEmpty()) { 125 LOG.info("Balancer is not out of moves. regionPlans.size()={}", regionPlans.size()); 126 isBalanced = false; 127 } 128 } 129 } 130 } 131 LOG.info("Balancer is done. Balancing took {}sec", 132 Duration.ofMillis(balancingMillis).toMinutes()); 133 } 134 135 /** 136 * Prints the current cluster distribution of regions per table per server 137 */ 138 static void printClusterDistribution(BalancerClusterState cluster, long actionsTaken) { 139 LOG.info("=== Cluster Distribution after {} balancer actions taken ===", actionsTaken); 140 141 for (int i = 0; i < cluster.numServers; i++) { 142 int[] regions = cluster.regionsPerServer[i]; 143 int regionCount = (regions == null) ? 0 : regions.length; 144 145 LOG.info("Server {}: {} regions", cluster.servers[i].getServerName(), regionCount); 146 147 if (regionCount > 0) { 148 Map<TableName, Integer> tableRegionCounts = new HashMap<>(); 149 150 for (int regionIndex : regions) { 151 RegionInfo regionInfo = cluster.regions[regionIndex]; 152 TableName tableName = regionInfo.getTable(); 153 tableRegionCounts.put(tableName, tableRegionCounts.getOrDefault(tableName, 0) + 1); 154 } 155 156 tableRegionCounts 157 .forEach((table, count) -> LOG.info(" - Table {}: {} regions", table, count)); 158 } 159 } 160 161 LOG.info("==========================================="); 162 } 163 164 /** 165 * Partitions the given serverToRegions map by table The tables are derived from the RegionInfo 166 * objects found in serverToRegions. 167 * @param serverToRegions The map of servers to their assigned regions. 168 * @return A map of tables to their server-to-region assignments. 169 */ 170 public static Map<TableName, Map<ServerName, List<RegionInfo>>> 171 partitionRegionsByTable(Map<ServerName, List<RegionInfo>> serverToRegions) { 172 173 // First, gather all tables from the regions 174 Set<TableName> allTables = new HashSet<>(); 175 for (List<RegionInfo> regions : serverToRegions.values()) { 176 for (RegionInfo region : regions) { 177 allTables.add(region.getTable()); 178 } 179 } 180 181 Map<TableName, Map<ServerName, List<RegionInfo>>> tablesToServersToRegions = new HashMap<>(); 182 183 // Initialize each table with all servers mapped to empty lists 184 for (TableName table : allTables) { 185 Map<ServerName, List<RegionInfo>> serverMap = new HashMap<>(); 186 for (ServerName server : serverToRegions.keySet()) { 187 serverMap.put(server, new ArrayList<>()); 188 } 189 tablesToServersToRegions.put(table, serverMap); 190 } 191 192 // Distribute regions to their respective tables 193 for (Map.Entry<ServerName, List<RegionInfo>> serverAndRegions : serverToRegions.entrySet()) { 194 ServerName server = serverAndRegions.getKey(); 195 List<RegionInfo> regions = serverAndRegions.getValue(); 196 197 for (RegionInfo region : regions) { 198 TableName regionTable = region.getTable(); 199 // Now we know for sure regionTable is in allTables 200 Map<ServerName, List<RegionInfo>> tableServerMap = 201 tablesToServersToRegions.get(regionTable); 202 tableServerMap.get(server).add(region); 203 } 204 } 205 206 return tablesToServersToRegions; 207 } 208 209 static StochasticLoadBalancer buildStochasticLoadBalancer(BalancerClusterState cluster, 210 Configuration conf) { 211 StochasticLoadBalancer stochasticLoadBalancer = 212 new StochasticLoadBalancer(new DummyMetricsStochasticBalancer()); 213 stochasticLoadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf)); 214 stochasticLoadBalancer.loadConf(conf); 215 stochasticLoadBalancer.initCosts(cluster); 216 return stochasticLoadBalancer; 217 } 218 219 static BalancerClusterState 220 createMockBalancerClusterState(Map<ServerName, List<RegionInfo>> serverToRegions) { 221 return new BalancerClusterState(serverToRegions, null, null, null, null); 222 } 223 224 /** 225 * Validates that each replica is isolated from its others. Ensures that no server hosts more than 226 * one replica of the same region (i.e., regions with identical start and end keys). 227 * @param cluster The current state of the cluster. 228 * @return true if all replicas are properly isolated, false otherwise. 229 */ 230 static boolean areAllReplicasDistributed(BalancerClusterState cluster) { 231 // Iterate over each server 232 for (int[] regionsPerServer : cluster.regionsPerServer) { 233 if (regionsPerServer == null || regionsPerServer.length == 0) { 234 continue; // Skip empty servers 235 } 236 237 Set<ReplicaKey> foundKeys = new HashSet<>(); 238 for (int regionIndex : regionsPerServer) { 239 RegionInfo regionInfo = cluster.regions[regionIndex]; 240 ReplicaKey replicaKey = new ReplicaKey(regionInfo); 241 if (foundKeys.contains(replicaKey)) { 242 // Violation: Multiple replicas of the same region on the same server 243 LOG.warn("Replica isolation violated: one server hosts multiple replicas of key [{}].", 244 generateRegionKey(regionInfo)); 245 return false; 246 } 247 248 foundKeys.add(replicaKey); 249 } 250 } 251 252 LOG.info( 253 "Replica isolation validation passed: No server hosts multiple replicas of the same region."); 254 return true; 255 } 256 257 /** 258 * Generic method to validate table isolation. 259 */ 260 static boolean isTableIsolated(BalancerClusterState cluster, TableName tableName, 261 String tableType) { 262 for (int i = 0; i < cluster.numServers; i++) { 263 int[] regionsOnServer = cluster.regionsPerServer[i]; 264 if (regionsOnServer == null || regionsOnServer.length == 0) { 265 continue; // Skip empty servers 266 } 267 268 boolean hasTargetTableRegion = false; 269 boolean hasOtherTableRegion = false; 270 271 for (int regionIndex : regionsOnServer) { 272 RegionInfo regionInfo = cluster.regions[regionIndex]; 273 if (regionInfo.getTable().equals(tableName)) { 274 hasTargetTableRegion = true; 275 } else { 276 hasOtherTableRegion = true; 277 } 278 279 // If the target table and any other table are on the same server, isolation is violated 280 if (hasTargetTableRegion && hasOtherTableRegion) { 281 LOG.debug( 282 "Server {} has both {} table regions and other table regions, violating isolation.", 283 cluster.servers[i].getServerName(), tableType); 284 return false; 285 } 286 } 287 } 288 LOG.debug("{} table isolation validation passed.", tableType); 289 return true; 290 } 291 292 /** 293 * Generates a unique key for a region based on its start and end keys. This method ensures that 294 * regions with identical start and end keys have the same key. 295 * @param regionInfo The RegionInfo object. 296 * @return A string representing the unique key of the region. 297 */ 298 private static String generateRegionKey(RegionInfo regionInfo) { 299 // Using Base64 encoding for byte arrays to ensure uniqueness and readability 300 String startKey = Base64.getEncoder().encodeToString(regionInfo.getStartKey()); 301 String endKey = Base64.getEncoder().encodeToString(regionInfo.getEndKey()); 302 303 return regionInfo.getTable().getNameAsString() + ":" + startKey + ":" + endKey; 304 } 305 306}