018package org.apache.hadoop.hbase.master;
020import java.io.IOException;
021import java.text.DecimalFormat;
022import java.util.ArrayList;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Random;
028import java.util.Scanner;
029import java.util.Set;
030import java.util.TreeMap;
031import java.util.concurrent.ThreadLocalRandom;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.ClusterConnection;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
045import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
046import org.apache.hadoop.hbase.util.FSUtils;
047import org.apache.hadoop.hbase.util.MunkresAssignment;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
053import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
054import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
055import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
056import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
057import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
059import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
060import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
066 * A tool that is used for manipulating and viewing favored nodes information for regions. Run with
067 * -h to get a list of the options
068 */
070// TODO: Remove? Unused. Partially implemented only.
071public class RegionPlacementMaintainer {
072  private static final Logger LOG =
073    LoggerFactory.getLogger(RegionPlacementMaintainer.class.getName());
074  // The cost of a placement that should never be assigned.
075  private static final float MAX_COST = Float.POSITIVE_INFINITY;
077  // The cost of a placement that is undesirable but acceptable.
078  private static final float AVOID_COST = 100000f;
080  // The amount by which the cost of a placement is increased if it is the
081  // last slot of the server. This is done to more evenly distribute the slop
082  // amongst servers.
083  private static final float LAST_SLOT_COST_PENALTY = 0.5f;
085  // The amount by which the cost of a primary placement is penalized if it is
086  // not the host currently serving the region. This is done to minimize moves.
087  private static final float NOT_CURRENT_HOST_PENALTY = 0.1f;
089  private static boolean USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = false;
091  private Configuration conf;
092  private final boolean enforceLocality;
093  private final boolean enforceMinAssignmentMove;
094  private RackManager rackManager;
095  private Set<TableName> targetTableSet;
096  private final Connection connection;
098  public RegionPlacementMaintainer(Configuration conf) {
099    this(conf, true, true);
100  }
102  public RegionPlacementMaintainer(Configuration conf, boolean enforceLocality,
103    boolean enforceMinAssignmentMove) {
104    this.conf = conf;
105    this.enforceLocality = enforceLocality;
106    this.enforceMinAssignmentMove = enforceMinAssignmentMove;
107    this.targetTableSet = new HashSet<>();
108    this.rackManager = new RackManager(conf);
109    try {
110      this.connection = ConnectionFactory.createConnection(this.conf);
111    } catch (IOException e) {
112      throw new RuntimeException(e);
113    }
114  }
116  private static void printHelp(Options opt) {
117    new HelpFormatter().printHelp(
118      "RegionPlacement < -w | -u | -n | -v | -t | -h | -overwrite -r regionName -f favoredNodes "
119        + "-diff>" + " [-l false] [-m false] [-d] [-tables t1,t2,...tn] [-zk zk1,zk2,zk3]"
120        + " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]",
121      opt);
122  }
124  public void setTargetTableName(String[] tableNames) {
125    if (tableNames != null) {
126      for (String table : tableNames)
127        this.targetTableSet.add(TableName.valueOf(table));
128    }
129  }
131  /** Returns the new RegionAssignmentSnapshot n */
132  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() throws IOException {
133    SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
134      new SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
135    currentAssignmentShapshot.initialize();
136    return currentAssignmentShapshot;
137  }
139  /**
140   * Verify the region placement is consistent with the assignment plan
141   */
142  public List<AssignmentVerificationReport> verifyRegionPlacement(boolean isDetailMode)
143    throws IOException {
144    System.out
145      .println("Start to verify the region assignment and " + "generate the verification report");
146    // Get the region assignment snapshot
147    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
149    // Get all the tables
150    Set<TableName> tables = snapshot.getTableSet();
152    // Get the region locality map
153    Map<String, Map<String, Float>> regionLocalityMap = null;
154    if (this.enforceLocality == true) {
155      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
156    }
157    List<AssignmentVerificationReport> reports = new ArrayList<>();
158    // Iterate all the tables to fill up the verification report
159    for (TableName table : tables) {
160      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
161        continue;
162      }
163      AssignmentVerificationReport report = new AssignmentVerificationReport();
164      report.fillUp(table, snapshot, regionLocalityMap);
165      report.print(isDetailMode);
166      reports.add(report);
167    }
168    return reports;
169  }
171  /**
172   * Generate the assignment plan for the existing table
173   * @param munkresForSecondaryAndTertiary if set on true the assignment plan for the tertiary and
174   *                                       secondary will be generated with Munkres algorithm,
175   *                                       otherwise will be generated using
176   *                                       placeSecondaryAndTertiaryRS
177   */
178  private void genAssignmentPlan(TableName tableName,
179    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot,
180    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan plan,
181    boolean munkresForSecondaryAndTertiary) throws IOException {
182    // Get the all the regions for the current table
183    List<RegionInfo> regions = assignmentSnapshot.getTableToRegionMap().get(tableName);
184    int numRegions = regions.size();
186    // Get the current assignment map
187    Map<RegionInfo, ServerName> currentAssignmentMap =
188      assignmentSnapshot.getRegionToRegionServerMap();
190    // Get the all the region servers
191    List<ServerName> servers = new ArrayList<>();
192    try (Admin admin = this.connection.getAdmin()) {
193      servers.addAll(admin.getRegionServers());
194    }
196    LOG.info("Start to generate assignment plan for " + numRegions + " regions from table "
197      + tableName + " with " + servers.size() + " region servers");
199    int slotsPerServer = (int) Math.ceil((float) numRegions / servers.size());
200    int regionSlots = slotsPerServer * servers.size();
202    // Compute the primary, secondary and tertiary costs for each region/server
203    // pair. These costs are based only on node locality and rack locality, and
204    // will be modified later.
205    float[][] primaryCost = new float[numRegions][regionSlots];
206    float[][] secondaryCost = new float[numRegions][regionSlots];
207    float[][] tertiaryCost = new float[numRegions][regionSlots];
209    if (this.enforceLocality && regionLocalityMap != null) {
210      // Transform the locality mapping into a 2D array, assuming that any
211      // unspecified locality value is 0.
212      float[][] localityPerServer = new float[numRegions][regionSlots];
213      for (int i = 0; i < numRegions; i++) {
214        Map<String, Float> serverLocalityMap =
215          regionLocalityMap.get(regions.get(i).getEncodedName());
216        if (serverLocalityMap == null) {
217          continue;
218        }
219        for (int j = 0; j < servers.size(); j++) {
220          String serverName = servers.get(j).getHostname();
221          if (serverName == null) {
222            continue;
223          }
224          Float locality = serverLocalityMap.get(serverName);
225          if (locality == null) {
226            continue;
227          }
228          for (int k = 0; k < slotsPerServer; k++) {
229            // If we can't find the locality of a region to a server, which occurs
230            // because locality is only reported for servers which have some
231            // blocks of a region local, then the locality for that pair is 0.
232            localityPerServer[i][j * slotsPerServer + k] = locality.floatValue();
233          }
234        }
235      }
237      // Compute the total rack locality for each region in each rack. The total
238      // rack locality is the sum of the localities of a region on all servers in
239      // a rack.
240      Map<String, Map<RegionInfo, Float>> rackRegionLocality = new HashMap<>();
241      for (int i = 0; i < numRegions; i++) {
242        RegionInfo region = regions.get(i);
243        for (int j = 0; j < regionSlots; j += slotsPerServer) {
244          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
245          Map<RegionInfo, Float> rackLocality = rackRegionLocality.get(rack);
246          if (rackLocality == null) {
247            rackLocality = new HashMap<>();
248            rackRegionLocality.put(rack, rackLocality);
249          }
250          Float localityObj = rackLocality.get(region);
251          float locality = localityObj == null ? 0 : localityObj.floatValue();
252          locality += localityPerServer[i][j];
253          rackLocality.put(region, locality);
254        }
255      }
256      for (int i = 0; i < numRegions; i++) {
257        for (int j = 0; j < regionSlots; j++) {
258          String rack = rackManager.getRack(servers.get(j / slotsPerServer));
259          Float totalRackLocalityObj = rackRegionLocality.get(rack).get(regions.get(i));
260          float totalRackLocality =
261            totalRackLocalityObj == null ? 0 : totalRackLocalityObj.floatValue();
263          // Primary cost aims to favor servers with high node locality and low
264          // rack locality, so that secondaries and tertiaries can be chosen for
265          // nodes with high rack locality. This might give primaries with
266          // slightly less locality at first compared to a cost which only
267          // considers the node locality, but should be better in the long run.
268          primaryCost[i][j] = 1 - (2 * localityPerServer[i][j] - totalRackLocality);
270          // Secondary cost aims to favor servers with high node locality and high
271          // rack locality since the tertiary will be chosen from the same rack as
272          // the secondary. This could be negative, but that is okay.
273          secondaryCost[i][j] = 2 - (localityPerServer[i][j] + totalRackLocality);
275          // Tertiary cost is only concerned with the node locality. It will later
276          // be restricted to only hosts on the same rack as the secondary.
277          tertiaryCost[i][j] = 1 - localityPerServer[i][j];
278        }
279      }
280    }
282    if (this.enforceMinAssignmentMove && currentAssignmentMap != null) {
283      // We want to minimize the number of regions which move as the result of a
284      // new assignment. Therefore, slightly penalize any placement which is for
285      // a host that is not currently serving the region.
286      for (int i = 0; i < numRegions; i++) {
287        for (int j = 0; j < servers.size(); j++) {
288          ServerName currentAddress = currentAssignmentMap.get(regions.get(i));
289          if (currentAddress != null && !currentAddress.equals(servers.get(j))) {
290            for (int k = 0; k < slotsPerServer; k++) {
291              primaryCost[i][j * slotsPerServer + k] += NOT_CURRENT_HOST_PENALTY;
292            }
293          }
294        }
295      }
296    }
298    // Artificially increase cost of last slot of each server to evenly
299    // distribute the slop, otherwise there will be a few servers with too few
300    // regions and many servers with the max number of regions.
301    for (int i = 0; i < numRegions; i++) {
302      for (int j = 0; j < regionSlots; j += slotsPerServer) {
303        primaryCost[i][j] += LAST_SLOT_COST_PENALTY;
304        secondaryCost[i][j] += LAST_SLOT_COST_PENALTY;
305        tertiaryCost[i][j] += LAST_SLOT_COST_PENALTY;
306      }
307    }
309    RandomizedMatrix randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
310    primaryCost = randomizedMatrix.transform(primaryCost);
311    int[] primaryAssignment = new MunkresAssignment(primaryCost).solve();
312    primaryAssignment = randomizedMatrix.invertIndices(primaryAssignment);
314    // Modify the secondary and tertiary costs for each region/server pair to
315    // prevent a region from being assigned to the same rack for both primary
316    // and either one of secondary or tertiary.
317    for (int i = 0; i < numRegions; i++) {
318      int slot = primaryAssignment[i];
319      String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
320      for (int k = 0; k < servers.size(); k++) {
321        if (!rackManager.getRack(servers.get(k)).equals(rack)) {
322          continue;
323        }
324        if (k == slot / slotsPerServer) {
325          // Same node, do not place secondary or tertiary here ever.
326          for (int m = 0; m < slotsPerServer; m++) {
327            secondaryCost[i][k * slotsPerServer + m] = MAX_COST;
328            tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
329          }
330        } else {
331          // Same rack, do not place secondary or tertiary here if possible.
332          for (int m = 0; m < slotsPerServer; m++) {
333            secondaryCost[i][k * slotsPerServer + m] = AVOID_COST;
334            tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
335          }
336        }
337      }
338    }
339    if (munkresForSecondaryAndTertiary) {
340      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
341      secondaryCost = randomizedMatrix.transform(secondaryCost);
342      int[] secondaryAssignment = new MunkresAssignment(secondaryCost).solve();
343      secondaryAssignment = randomizedMatrix.invertIndices(secondaryAssignment);
345      // Modify the tertiary costs for each region/server pair to ensure that a
346      // region is assigned to a tertiary server on the same rack as its secondary
347      // server, but not the same server in that rack.
348      for (int i = 0; i < numRegions; i++) {
349        int slot = secondaryAssignment[i];
350        String rack = rackManager.getRack(servers.get(slot / slotsPerServer));
351        for (int k = 0; k < servers.size(); k++) {
352          if (k == slot / slotsPerServer) {
353            // Same node, do not place tertiary here ever.
354            for (int m = 0; m < slotsPerServer; m++) {
355              tertiaryCost[i][k * slotsPerServer + m] = MAX_COST;
356            }
357          } else {
358            if (rackManager.getRack(servers.get(k)).equals(rack)) {
359              continue;
360            }
361            // Different rack, do not place tertiary here if possible.
362            for (int m = 0; m < slotsPerServer; m++) {
363              tertiaryCost[i][k * slotsPerServer + m] = AVOID_COST;
364            }
365          }
366        }
367      }
369      randomizedMatrix = new RandomizedMatrix(numRegions, regionSlots);
370      tertiaryCost = randomizedMatrix.transform(tertiaryCost);
371      int[] tertiaryAssignment = new MunkresAssignment(tertiaryCost).solve();
372      tertiaryAssignment = randomizedMatrix.invertIndices(tertiaryAssignment);
374      for (int i = 0; i < numRegions; i++) {
375        List<ServerName> favoredServers =
376          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
377        ServerName s = servers.get(primaryAssignment[i] / slotsPerServer);
378        favoredServers
379          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
381        s = servers.get(secondaryAssignment[i] / slotsPerServer);
382        favoredServers
383          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
385        s = servers.get(tertiaryAssignment[i] / slotsPerServer);
386        favoredServers
387          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
388        // Update the assignment plan
389        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
390      }
391      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
392        + tableName + " with " + servers.size() + " region servers");
393      LOG.info("Assignment plan for secondary and tertiary generated " + "using MunkresAssignment");
394    } else {
395      Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
396      for (int i = 0; i < numRegions; i++) {
397        primaryRSMap.put(regions.get(i), servers.get(primaryAssignment[i] / slotsPerServer));
398      }
399      FavoredNodeAssignmentHelper favoredNodeHelper =
400        new FavoredNodeAssignmentHelper(servers, conf);
401      favoredNodeHelper.initialize();
402      Map<RegionInfo, ServerName[]> secondaryAndTertiaryMap =
403        favoredNodeHelper.placeSecondaryAndTertiaryWithRestrictions(primaryRSMap);
404      for (int i = 0; i < numRegions; i++) {
405        List<ServerName> favoredServers =
406          new ArrayList<>(FavoredNodeAssignmentHelper.FAVORED_NODES_NUM);
407        RegionInfo currentRegion = regions.get(i);
408        ServerName s = primaryRSMap.get(currentRegion);
409        favoredServers
410          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
412        ServerName[] secondaryAndTertiary = secondaryAndTertiaryMap.get(currentRegion);
413        s = secondaryAndTertiary[0];
414        favoredServers
415          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
417        s = secondaryAndTertiary[1];
418        favoredServers
419          .add(ServerName.valueOf(s.getHostname(), s.getPort(), ServerName.NON_STARTCODE));
420        // Update the assignment plan
421        plan.updateFavoredNodesMap(regions.get(i), favoredServers);
422      }
423      LOG.info("Generated the assignment plan for " + numRegions + " regions from table "
424        + tableName + " with " + servers.size() + " region servers");
425      LOG.info("Assignment plan for secondary and tertiary generated "
426        + "using placeSecondaryAndTertiaryWithRestrictions method");
427    }
428  }
430  public FavoredNodesPlan getNewAssignmentPlan() throws IOException {
431    // Get the current region assignment snapshot by scanning from the META
432    SnapshotOfRegionAssignmentFromMeta assignmentSnapshot = this.getRegionAssignmentSnapshot();
434    // Get the region locality map
435    Map<String, Map<String, Float>> regionLocalityMap = null;
436    if (this.enforceLocality) {
437      regionLocalityMap = FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
438    }
439    // Initialize the assignment plan
440    FavoredNodesPlan plan = new FavoredNodesPlan();
442    // Get the table to region mapping
443    Map<TableName, List<RegionInfo>> tableToRegionMap = assignmentSnapshot.getTableToRegionMap();
444    LOG.info("Start to generate the new assignment plan for the "
445      + +tableToRegionMap.keySet().size() + " tables");
446    for (TableName table : tableToRegionMap.keySet()) {
447      try {
448        if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
449          continue;
450        }
451        // TODO: maybe run the placement in parallel for each table
452        genAssignmentPlan(table, assignmentSnapshot, regionLocalityMap, plan,
454      } catch (Exception e) {
455        LOG.error("Get some exceptions for placing primary region server" + "for table " + table
456          + " because " + e);
457      }
458    }
459    LOG.info("Finish to generate the new assignment plan for the "
460      + +tableToRegionMap.keySet().size() + " tables");
461    return plan;
462  }
464  /**
465   * Some algorithms for solving the assignment problem may traverse workers or jobs in linear order
466   * which may result in skewing the assignments of the first jobs in the matrix toward the last
467   * workers in the matrix if the costs are uniform. To avoid this kind of clumping, we can
468   * randomize the rows and columns of the cost matrix in a reversible way, such that the solution
469   * to the assignment problem can be interpreted in terms of the original untransformed cost
470   * matrix. Rows and columns are transformed independently such that the elements contained in any
471   * row of the input matrix are the same as the elements in the corresponding output matrix, and
472   * each row has its elements transformed in the same way. Similarly for columns.
473   */
474  protected static class RandomizedMatrix {
475    private final int rows;
476    private final int cols;
477    private final int[] rowTransform;
478    private final int[] rowInverse;
479    private final int[] colTransform;
480    private final int[] colInverse;
482    /**
483     * Create a randomization scheme for a matrix of a given size.
484     * @param rows the number of rows in the matrix
485     * @param cols the number of columns in the matrix
486     */
487    public RandomizedMatrix(int rows, int cols) {
488      this.rows = rows;
489      this.cols = cols;
490      Random random = ThreadLocalRandom.current();
491      rowTransform = new int[rows];
492      rowInverse = new int[rows];
493      for (int i = 0; i < rows; i++) {
494        rowTransform[i] = i;
495      }
496      // Shuffle the row indices.
497      for (int i = rows - 1; i >= 0; i--) {
498        int r = random.nextInt(i + 1);
499        int temp = rowTransform[r];
500        rowTransform[r] = rowTransform[i];
501        rowTransform[i] = temp;
502      }
503      // Generate the inverse row indices.
504      for (int i = 0; i < rows; i++) {
505        rowInverse[rowTransform[i]] = i;
506      }
508      colTransform = new int[cols];
509      colInverse = new int[cols];
510      for (int i = 0; i < cols; i++) {
511        colTransform[i] = i;
512      }
513      // Shuffle the column indices.
514      for (int i = cols - 1; i >= 0; i--) {
515        int r = random.nextInt(i + 1);
516        int temp = colTransform[r];
517        colTransform[r] = colTransform[i];
518        colTransform[i] = temp;
519      }
520      // Generate the inverse column indices.
521      for (int i = 0; i < cols; i++) {
522        colInverse[colTransform[i]] = i;
523      }
524    }
526    /**
527     * Copy a given matrix into a new matrix, transforming each row index and each column index
528     * according to the randomization scheme that was created at construction time.
529     * @param matrix the cost matrix to transform
530     * @return a new matrix with row and column indices transformed
531     */
532    public float[][] transform(float[][] matrix) {
533      float[][] result = new float[rows][cols];
534      for (int i = 0; i < rows; i++) {
535        for (int j = 0; j < cols; j++) {
536          result[rowTransform[i]][colTransform[j]] = matrix[i][j];
537        }
538      }
539      return result;
540    }
542    /**
543     * Copy a given matrix into a new matrix, transforming each row index and each column index
544     * according to the inverse of the randomization scheme that was created at construction time.
545     * @param matrix the cost matrix to be inverted
546     * @return a new matrix with row and column indices inverted
547     */
548    public float[][] invert(float[][] matrix) {
549      float[][] result = new float[rows][cols];
550      for (int i = 0; i < rows; i++) {
551        for (int j = 0; j < cols; j++) {
552          result[rowInverse[i]][colInverse[j]] = matrix[i][j];
553        }
554      }
555      return result;
556    }
558    /**
559     * Given an array where each element {@code indices[i]} represents the randomized column index
560     * corresponding to randomized row index {@code i}, create a new array with the corresponding
561     * inverted indices.
562     * @param indices an array of transformed indices to be inverted
563     * @return an array of inverted indices
564     */
565    public int[] invertIndices(int[] indices) {
566      int[] result = new int[indices.length];
567      for (int i = 0; i < indices.length; i++) {
568        result[rowInverse[i]] = colInverse[indices[i]];
569      }
570      return result;
571    }
572  }
574  /**
575   * Print the assignment plan to the system output stream
576   */
577  public static void printAssignmentPlan(FavoredNodesPlan plan) {
578    if (plan == null) return;
579    LOG.info("========== Start to print the assignment plan ================");
580    // sort the map based on region info
581    Map<String, List<ServerName>> assignmentMap = new TreeMap<>(plan.getAssignmentMap());
583    for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
585      String serverList = FavoredNodeAssignmentHelper.getFavoredNodesAsString(entry.getValue());
586      String regionName = entry.getKey();
587      LOG.info("Region: " + regionName);
588      LOG.info("Its favored nodes: " + serverList);
589    }
590    LOG.info("========== Finish to print the assignment plan ================");
591  }
593  /**
594   * Update the assignment plan into hbase:meta
595   * @param plan the assignments plan to be updated into hbase:meta
596   * @throws IOException if cannot update assignment plan in hbase:meta
597   */
598  public void updateAssignmentPlanToMeta(FavoredNodesPlan plan) throws IOException {
599    try {
600      LOG.info("Start to update the hbase:meta with the new assignment plan");
601      Map<String, List<ServerName>> assignmentMap = plan.getAssignmentMap();
602      Map<RegionInfo, List<ServerName>> planToUpdate = new HashMap<>(assignmentMap.size());
603      Map<String, RegionInfo> regionToRegionInfoMap =
604        getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap();
605      for (Map.Entry<String, List<ServerName>> entry : assignmentMap.entrySet()) {
606        planToUpdate.put(regionToRegionInfoMap.get(entry.getKey()), entry.getValue());
607      }
609      FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(planToUpdate, conf);
610      LOG.info("Updated the hbase:meta with the new assignment plan");
611    } catch (Exception e) {
612      LOG.error(
613        "Failed to update hbase:meta with the new assignment" + "plan because " + e.getMessage());
614    }
615  }
617  /**
618   * Update the assignment plan to all the region servers
619   */
620  private void updateAssignmentPlanToRegionServers(FavoredNodesPlan plan) throws IOException {
621    LOG.info("Start to update the region servers with the new assignment plan");
622    // Get the region to region server map
623    Map<ServerName, List<RegionInfo>> currentAssignment =
624      this.getRegionAssignmentSnapshot().getRegionServerToRegionMap();
626    // track of the failed and succeeded updates
627    int succeededNum = 0;
628    Map<ServerName, Exception> failedUpdateMap = new HashMap<>();
630    for (Map.Entry<ServerName, List<RegionInfo>> entry : currentAssignment.entrySet()) {
631      List<Pair<RegionInfo, List<ServerName>>> regionUpdateInfos = new ArrayList<>();
632      try {
633        // Keep track of the favored updates for the current region server
634        FavoredNodesPlan singleServerPlan = null;
635        // Find out all the updates for the current region server
636        for (RegionInfo region : entry.getValue()) {
637          List<ServerName> favoredServerList = plan.getFavoredNodes(region);
638          if (
639            favoredServerList != null
640              && favoredServerList.size() == FavoredNodeAssignmentHelper.FAVORED_NODES_NUM
641          ) {
642            // Create the single server plan if necessary
643            if (singleServerPlan == null) {
644              singleServerPlan = new FavoredNodesPlan();
645            }
646            // Update the single server update
647            singleServerPlan.updateFavoredNodesMap(region, favoredServerList);
648            regionUpdateInfos.add(new Pair<>(region, favoredServerList));
649          }
650        }
651        if (singleServerPlan != null) {
652          // Update the current region server with its updated favored nodes
653          BlockingInterface currentRegionServer =
654            ((ClusterConnection) this.connection).getAdmin(entry.getKey());
655          UpdateFavoredNodesRequest request =
656            RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
658          UpdateFavoredNodesResponse updateFavoredNodesResponse =
659            currentRegionServer.updateFavoredNodes(null, request);
660          LOG.info(
661            "Region server " + ProtobufUtil.getServerInfo(null, currentRegionServer).getServerName()
662              + " has updated " + updateFavoredNodesResponse.getResponse() + " / "
663              + singleServerPlan.size() + " regions with the assignment plan");
664          succeededNum++;
665        }
666      } catch (Exception e) {
667        failedUpdateMap.put(entry.getKey(), e);
668      }
669    }
670    // log the succeeded updates
671    LOG.info("Updated " + succeededNum + " region servers with " + "the new assignment plan");
673    // log the failed updates
674    int failedNum = failedUpdateMap.size();
675    if (failedNum != 0) {
676      LOG.error("Failed to update the following + " + failedNum
677        + " region servers with its corresponding favored nodes");
678      for (Map.Entry<ServerName, Exception> entry : failedUpdateMap.entrySet()) {
679        LOG.error("Failed to update " + entry.getKey().getAddress() + " because of "
680          + entry.getValue().getMessage());
681      }
682    }
683  }
685  public void updateAssignmentPlan(FavoredNodesPlan plan) throws IOException {
686    LOG.info("Start to update the new assignment plan for the hbase:meta table and"
687      + " the region servers");
688    // Update the new assignment plan to META
689    updateAssignmentPlanToMeta(plan);
690    // Update the new assignment plan to Region Servers
691    updateAssignmentPlanToRegionServers(plan);
692    LOG.info("Finish to update the new assignment plan for the hbase:meta table and"
693      + " the region servers");
694  }
696  /**
697   * Return how many regions will move per table since their primary RS will change
698   * @param newPlan - new AssignmentPlan
699   * @return how many primaries will move per table
700   */
701  public Map<TableName, Integer> getRegionsMovement(FavoredNodesPlan newPlan) throws IOException {
702    Map<TableName, Integer> movesPerTable = new HashMap<>();
703    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
704    Map<TableName, List<RegionInfo>> tableToRegions = snapshot.getTableToRegionMap();
705    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
706    Set<TableName> tables = snapshot.getTableSet();
707    for (TableName table : tables) {
708      int movedPrimaries = 0;
709      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
710        continue;
711      }
712      List<RegionInfo> regions = tableToRegions.get(table);
713      for (RegionInfo region : regions) {
714        List<ServerName> oldServers = oldPlan.getFavoredNodes(region);
715        List<ServerName> newServers = newPlan.getFavoredNodes(region);
716        if (oldServers != null && newServers != null) {
717          ServerName oldPrimary = oldServers.get(0);
718          ServerName newPrimary = newServers.get(0);
719          if (oldPrimary.compareTo(newPrimary) != 0) {
720            movedPrimaries++;
721          }
722        }
723      }
724      movesPerTable.put(table, movedPrimaries);
725    }
726    return movesPerTable;
727  }
729  /**
730   * Compares two plans and check whether the locality dropped or increased (prints the information
731   * as a string) also prints the baseline locality
732   * @param movesPerTable     - how many primary regions will move per table
733   * @param regionLocalityMap - locality map from FS
734   * @param newPlan           - new assignment plan
735   */
736  public void checkDifferencesWithOldPlan(Map<TableName, Integer> movesPerTable,
737    Map<String, Map<String, Float>> regionLocalityMap, FavoredNodesPlan newPlan)
738    throws IOException {
739    // localities for primary, secondary and tertiary
740    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
741    FavoredNodesPlan oldPlan = snapshot.getExistingAssignmentPlan();
742    Set<TableName> tables = snapshot.getTableSet();
743    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
744    for (TableName table : tables) {
745      float[] deltaLocality = new float[3];
746      float[] locality = new float[3];
747      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
748        continue;
749      }
750      List<RegionInfo> regions = tableToRegionsMap.get(table);
751      System.out.println("==================================================");
752      System.out.println("Assignment Plan Projection Report For Table: " + table);
753      System.out.println("\t Total regions: " + regions.size());
754      System.out.println(
755        "\t" + movesPerTable.get(table) + " primaries will move due to their primary has changed");
756      for (RegionInfo currentRegion : regions) {
757        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
758        if (regionLocality == null) {
759          continue;
760        }
761        List<ServerName> oldServers = oldPlan.getFavoredNodes(currentRegion);
762        List<ServerName> newServers = newPlan.getFavoredNodes(currentRegion);
763        if (newServers != null && oldServers != null) {
764          int i = 0;
765          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
766            ServerName newServer = newServers.get(p.ordinal());
767            ServerName oldServer = oldServers.get(p.ordinal());
768            Float oldLocality = 0f;
769            if (oldServers != null) {
770              oldLocality = regionLocality.get(oldServer.getHostname());
771              if (oldLocality == null) {
772                oldLocality = 0f;
773              }
774              locality[i] += oldLocality;
775            }
776            Float newLocality = regionLocality.get(newServer.getHostname());
777            if (newLocality == null) {
778              newLocality = 0f;
779            }
780            deltaLocality[i] += newLocality - oldLocality;
781            i++;
782          }
783        }
784      }
785      DecimalFormat df = new java.text.DecimalFormat("#.##");
786      for (int i = 0; i < deltaLocality.length; i++) {
787        System.out.print("\t\t Baseline locality for ");
788        if (i == 0) {
789          System.out.print("primary ");
790        } else if (i == 1) {
791          System.out.print("secondary ");
792        } else if (i == 2) {
793          System.out.print("tertiary ");
794        }
795        System.out.println(df.format(100 * locality[i] / regions.size()) + "%");
796        System.out.print("\t\t Locality will change with the new plan: ");
797        System.out.println(df.format(100 * deltaLocality[i] / regions.size()) + "%");
798      }
799      System.out.println("\t Baseline dispersion");
800      printDispersionScores(table, snapshot, regions.size(), null, true);
801      System.out.println("\t Projected dispersion");
802      printDispersionScores(table, snapshot, regions.size(), newPlan, true);
803    }
804  }
806  public void printDispersionScores(TableName table, SnapshotOfRegionAssignmentFromMeta snapshot,
807    int numRegions, FavoredNodesPlan newPlan, boolean simplePrint) {
808    if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
809      return;
810    }
811    AssignmentVerificationReport report = new AssignmentVerificationReport();
812    report.fillUpDispersion(table, snapshot, newPlan);
813    List<Float> dispersion = report.getDispersionInformation();
814    if (simplePrint) {
815      DecimalFormat df = new java.text.DecimalFormat("#.##");
816      System.out.println("\tAvg dispersion score: " + df.format(dispersion.get(0))
817        + " hosts;\tMax dispersion score: " + df.format(dispersion.get(1))
818        + " hosts;\tMin dispersion score: " + df.format(dispersion.get(2)) + " hosts;");
819    } else {
820      LOG.info("For Table: " + table + " ; #Total Regions: " + numRegions
821        + " ; The average dispersion score is " + dispersion.get(0));
822    }
823  }
825  public void printLocalityAndDispersionForCurrentPlan(
826    Map<String, Map<String, Float>> regionLocalityMap) throws IOException {
827    SnapshotOfRegionAssignmentFromMeta snapshot = this.getRegionAssignmentSnapshot();
828    FavoredNodesPlan assignmentPlan = snapshot.getExistingAssignmentPlan();
829    Set<TableName> tables = snapshot.getTableSet();
830    Map<TableName, List<RegionInfo>> tableToRegionsMap = snapshot.getTableToRegionMap();
831    for (TableName table : tables) {
832      float[] locality = new float[3];
833      if (!this.targetTableSet.isEmpty() && !this.targetTableSet.contains(table)) {
834        continue;
835      }
836      List<RegionInfo> regions = tableToRegionsMap.get(table);
837      for (RegionInfo currentRegion : regions) {
838        Map<String, Float> regionLocality = regionLocalityMap.get(currentRegion.getEncodedName());
839        if (regionLocality == null) {
840          continue;
841        }
842        List<ServerName> servers = assignmentPlan.getFavoredNodes(currentRegion);
843        if (servers != null) {
844          int i = 0;
845          for (FavoredNodesPlan.Position p : FavoredNodesPlan.Position.values()) {
846            ServerName server = servers.get(p.ordinal());
847            Float currentLocality = 0f;
848            if (servers != null) {
849              currentLocality = regionLocality.get(server.getHostname());
850              if (currentLocality == null) {
851                currentLocality = 0f;
852              }
853              locality[i] += currentLocality;
854            }
855            i++;
856          }
857        }
858      }
859      for (int i = 0; i < locality.length; i++) {
860        String copy = null;
861        if (i == 0) {
862          copy = "primary";
863        } else if (i == 1) {
864          copy = "secondary";
865        } else if (i == 2) {
866          copy = "tertiary";
867        }
868        float avgLocality = 100 * locality[i] / regions.size();
869        LOG.info("For Table: " + table + " ; #Total Regions: " + regions.size()
870          + " ; The average locality for " + copy + " is " + avgLocality + " %");
871      }
872      printDispersionScores(table, snapshot, regions.size(), null, false);
873    }
874  }
876  /**
877   * @param favoredNodesStr The String of favored nodes
878   * @return the list of ServerName for the byte array of favored nodes.
879   */
880  public static List<ServerName> getFavoredNodeList(String favoredNodesStr) {
881    String[] favoredNodesArray = StringUtils.split(favoredNodesStr, ",");
882    if (favoredNodesArray == null) return null;
884    List<ServerName> serverList = new ArrayList<>();
885    for (String hostNameAndPort : favoredNodesArray) {
886      serverList.add(ServerName.valueOf(hostNameAndPort, ServerName.NON_STARTCODE));
887    }
888    return serverList;
889  }
891  public static void main(String args[]) throws IOException {
892    Options opt = new Options();
893    opt.addOption("w", "write", false, "write the assignments to hbase:meta only");
894    opt.addOption("u", "update", false,
895      "update the assignments to hbase:meta and RegionServers together");
896    opt.addOption("n", "dry-run", false, "do not write assignments to META");
897    opt.addOption("v", "verify", false, "verify current assignments against META");
898    opt.addOption("p", "print", false, "print the current assignment plan in META");
899    opt.addOption("h", "help", false, "print usage");
900    opt.addOption("d", "verification-details", false, "print the details of verification report");
902    opt.addOption("zk", true, "to set the zookeeper quorum");
903    opt.addOption("fs", true, "to set HDFS");
904    opt.addOption("hbase_root", true, "to set hbase_root directory");
906    opt.addOption("overwrite", false, "overwrite the favored nodes for a single region,"
907      + "for example: -update -r regionName -f server1:port,server2:port,server3:port");
908    opt.addOption("r", true, "The region name that needs to be updated");
909    opt.addOption("f", true, "The new favored nodes");
911    opt.addOption("tables", true,
912      "The list of table names splitted by ',' ;" + "For example: -tables: t1,t2,...,tn");
913    opt.addOption("l", "locality", true, "enforce the maximum locality");
914    opt.addOption("m", "min-move", true, "enforce minimum assignment move");
915    opt.addOption("diff", false, "calculate difference between assignment plans");
916    opt.addOption("munkres", false, "use munkres to place secondaries and tertiaries");
917    opt.addOption("ld", "locality-dispersion", false,
918      "print locality and dispersion " + "information for current plan");
919    try {
920      CommandLine cmd = new GnuParser().parse(opt, args);
921      Configuration conf = HBaseConfiguration.create();
923      boolean enforceMinAssignmentMove = true;
924      boolean enforceLocality = true;
925      boolean verificationDetails = false;
927      // Read all the options
928      if (
929        (cmd.hasOption("l") && cmd.getOptionValue("l").equalsIgnoreCase("false"))
930          || (cmd.hasOption("locality") && cmd.getOptionValue("locality").equalsIgnoreCase("false"))
931      ) {
932        enforceLocality = false;
933      }
935      if (
936        (cmd.hasOption("m") && cmd.getOptionValue("m").equalsIgnoreCase("false"))
937          || (cmd.hasOption("min-move") && cmd.getOptionValue("min-move").equalsIgnoreCase("false"))
938      ) {
939        enforceMinAssignmentMove = false;
940      }
942      if (cmd.hasOption("zk")) {
943        conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue("zk"));
944        LOG.info("Setting the zk quorum: " + conf.get(HConstants.ZOOKEEPER_QUORUM));
945      }
947      if (cmd.hasOption("fs")) {
948        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, cmd.getOptionValue("fs"));
949        LOG.info("Setting the HDFS: " + conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
950      }
952      if (cmd.hasOption("hbase_root")) {
953        conf.set(HConstants.HBASE_DIR, cmd.getOptionValue("hbase_root"));
954        LOG.info("Setting the hbase root directory: " + conf.get(HConstants.HBASE_DIR));
955      }
957      // Create the region placement obj
958      RegionPlacementMaintainer rp =
959        new RegionPlacementMaintainer(conf, enforceLocality, enforceMinAssignmentMove);
961      if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
962        verificationDetails = true;
963      }
965      if (cmd.hasOption("tables")) {
966        String tableNameListStr = cmd.getOptionValue("tables");
967        String[] tableNames = StringUtils.split(tableNameListStr, ",");
968        rp.setTargetTableName(tableNames);
969      }
971      if (cmd.hasOption("munkres")) {
973      }
975      // Read all the modes
976      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
977        // Verify the region placement.
978        rp.verifyRegionPlacement(verificationDetails);
979      } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
980        // Generate the assignment plan only without updating the hbase:meta and RS
981        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
982        printAssignmentPlan(plan);
983      } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
984        // Generate the new assignment plan
985        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
986        // Print the new assignment plan
987        printAssignmentPlan(plan);
988        // Write the new assignment plan to META
989        rp.updateAssignmentPlanToMeta(plan);
990      } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
991        // Generate the new assignment plan
992        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
993        // Print the new assignment plan
994        printAssignmentPlan(plan);
995        // Update the assignment to hbase:meta and Region Servers
996        rp.updateAssignmentPlan(plan);
997      } else if (cmd.hasOption("diff")) {
998        FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
999        Map<String, Map<String, Float>> locality =
1000          FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1001        Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
1002        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
1003        System.out.println("Do you want to update the assignment plan? [y/n]");
1004        Scanner s = new Scanner(System.in);
1005        String input = s.nextLine().trim();
1006        if (input.equals("y")) {
1007          System.out.println("Updating assignment plan...");
1008          rp.updateAssignmentPlan(newPlan);
1009        }
1010        s.close();
1011      } else if (cmd.hasOption("ld")) {
1012        Map<String, Map<String, Float>> locality =
1013          FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
1014        rp.printLocalityAndDispersionForCurrentPlan(locality);
1015      } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
1016        FavoredNodesPlan plan = rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
1017        printAssignmentPlan(plan);
1018      } else if (cmd.hasOption("overwrite")) {
1019        if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
1020          throw new IllegalArgumentException("Please specify: "
1021            + " -update -r regionName -f server1:port,server2:port,server3:port");
1022        }
1024        String regionName = cmd.getOptionValue("r");
1025        String favoredNodesStr = cmd.getOptionValue("f");
1026        LOG.info("Going to update the region " + regionName + " with the new favored nodes "
1027          + favoredNodesStr);
1028        List<ServerName> favoredNodes = null;
1029        RegionInfo regionInfo =
1030          rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
1031        if (regionInfo == null) {
1032          LOG.error("Cannot find the region " + regionName + " from the META");
1033        } else {
1034          try {
1035            favoredNodes = getFavoredNodeList(favoredNodesStr);
1036          } catch (IllegalArgumentException e) {
1037            LOG.error("Cannot parse the invalid favored nodes because " + e);
1038          }
1039          FavoredNodesPlan newPlan = new FavoredNodesPlan();
1040          newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
1041          rp.updateAssignmentPlan(newPlan);
1042        }
1043      } else {
1044        printHelp(opt);
1045      }
1046    } catch (ParseException e) {
1047      printHelp(opt);
1048    }
1049  }