001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicBoolean;
034import java.util.concurrent.locks.Condition;
035import java.util.concurrent.locks.ReentrantLock;
036import java.util.function.Consumer;
037import java.util.function.Function;
038import java.util.stream.Collectors;
039import java.util.stream.Stream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.CatalogFamilyFormat;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HBaseIOException;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.PleaseHoldException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.UnknownRegionException;
049import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
050import org.apache.hadoop.hbase.client.MasterSwitchType;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.client.RegionInfoBuilder;
053import org.apache.hadoop.hbase.client.RegionReplicaUtil;
054import org.apache.hadoop.hbase.client.RegionStatesCount;
055import org.apache.hadoop.hbase.client.Result;
056import org.apache.hadoop.hbase.client.ResultScanner;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.TableDescriptor;
059import org.apache.hadoop.hbase.client.TableState;
060import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
061import org.apache.hadoop.hbase.favored.FavoredNodesManager;
062import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
063import org.apache.hadoop.hbase.master.LoadBalancer;
064import org.apache.hadoop.hbase.master.MasterServices;
065import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
066import org.apache.hadoop.hbase.master.RegionPlan;
067import org.apache.hadoop.hbase.master.RegionState;
068import org.apache.hadoop.hbase.master.RegionState.State;
069import org.apache.hadoop.hbase.master.ServerManager;
070import org.apache.hadoop.hbase.master.TableStateManager;
071import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
072import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
073import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
074import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
075import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
076import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
077import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
078import org.apache.hadoop.hbase.master.region.MasterRegion;
079import org.apache.hadoop.hbase.procedure2.Procedure;
080import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
081import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
082import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
083import org.apache.hadoop.hbase.procedure2.util.StringUtils;
084import org.apache.hadoop.hbase.regionserver.SequenceId;
085import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
086import org.apache.hadoop.hbase.util.Bytes;
087import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
088import org.apache.hadoop.hbase.util.FutureUtils;
089import org.apache.hadoop.hbase.util.Pair;
090import org.apache.hadoop.hbase.util.Threads;
091import org.apache.hadoop.hbase.util.VersionInfo;
092import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
093import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
094import org.apache.yetus.audience.InterfaceAudience;
095import org.apache.zookeeper.KeeperException;
096import org.slf4j.Logger;
097import org.slf4j.LoggerFactory;
098
099import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
104
105/**
106 * The AssignmentManager is the coordinator for region assign/unassign operations.
107 * <ul>
108 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
109 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
110 * </ul>
111 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
112 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
113 * are triggered by DisableTable, Split, Merge
114 */
115@InterfaceAudience.Private
116public class AssignmentManager {
117  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
118
119  // TODO: AMv2
120  // - handle region migration from hbase1 to hbase2.
121  // - handle sys table assignment first (e.g. acl, namespace)
122  // - handle table priorities
123  // - If ServerBusyException trying to update hbase:meta, we abort the Master
124  // See updateRegionLocation in RegionStateStore.
125  //
126  // See also
127  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
128  // for other TODOs.
129
130  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
131    "hbase.assignment.bootstrap.thread.pool.size";
132
133  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
134    "hbase.assignment.dispatch.wait.msec";
135  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
136
137  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
138    "hbase.assignment.dispatch.wait.queue.max.size";
139  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
140
141  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
142    "hbase.assignment.rit.chore.interval.msec";
143  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
144
145  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
146    "hbase.assignment.dead.region.metric.chore.interval.msec";
147  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
148
149  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
150  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
151
152  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
153    "hbase.assignment.retry.immediately.maximum.attempts";
154  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
155
156  /** Region in Transition metrics threshold time */
157  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
158    "hbase.metrics.rit.stuck.warning.threshold";
159  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
160  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
161
162  public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";
163
164  public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;
165
166  /** The wait time in millis before checking again if the region's previous RS is back online */
167  public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
168    "hbase.master.scp.retain.assignment.force.wait-interval";
169
170  public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;
171
172  /**
173   * The number of times to check if the region's previous RS is back online, before giving up and
174   * proceeding with assignment on a new RS
175   */
176  public static final String FORCE_REGION_RETAINMENT_RETRIES =
177    "hbase.master.scp.retain.assignment.force.retries";
178
179  public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;
180
181  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
182  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
183
184  private final MetricsAssignmentManager metrics;
185  private final RegionInTransitionChore ritChore;
186  private final DeadServerMetricRegionChore deadMetricChore;
187  private final MasterServices master;
188
189  private final AtomicBoolean running = new AtomicBoolean(false);
190  private final RegionStates regionStates = new RegionStates();
191  private final RegionStateStore regionStateStore;
192
193  /**
194   * When the operator uses this configuration option, any version between the current cluster
195   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
196   * auto-region movement. Auto-region movement here refers to auto-migration of system table
197   * regions to newer server versions. It is assumed that the configured range of versions does not
198   * require special handling of moving system table regions to higher versioned RegionServer. This
199   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
200   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
201   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
202   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
203   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
204   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
205   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
206   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
207   * introduced as part of HBASE-22923.
208   */
209  private final String minVersionToMoveSysTables;
210
211  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
212    "hbase.min.version.move.system.tables";
213  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
214
215  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
216
217  private final boolean shouldAssignRegionsWithFavoredNodes;
218  private final int assignDispatchWaitQueueMaxSize;
219  private final int assignDispatchWaitMillis;
220  private final int assignMaxAttempts;
221  private final int assignRetryImmediatelyMaxAttempts;
222
223  private final MasterRegion masterRegion;
224
225  private final Object checkIfShouldMoveSystemRegionLock = new Object();
226
227  private Thread assignThread;
228
229  private final boolean forceRegionRetainment;
230
231  private final long forceRegionRetainmentWaitInterval;
232
233  private final int forceRegionRetainmentRetries;
234
235  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
236    this(master, masterRegion, new RegionStateStore(master, masterRegion));
237  }
238
239  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
240    this.master = master;
241    this.regionStateStore = stateStore;
242    this.metrics = new MetricsAssignmentManager();
243    this.masterRegion = masterRegion;
244
245    final Configuration conf = master.getConfiguration();
246
247    // Only read favored nodes if using the favored nodes load balancer.
248    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
249      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
250
251    this.assignDispatchWaitMillis =
252      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
253    this.assignDispatchWaitQueueMaxSize =
254      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
255
256    this.assignMaxAttempts =
257      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
258    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
259      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
260
261    int ritChoreInterval =
262      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
263    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
264
265    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
266      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
267    if (deadRegionChoreInterval > 0) {
268      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
269    } else {
270      this.deadMetricChore = null;
271    }
272    minVersionToMoveSysTables =
273      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
274
275    forceRegionRetainment =
276      conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
277    forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
278      DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
279    forceRegionRetainmentRetries =
280      conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
281  }
282
283  private void mirrorMetaLocations() throws IOException, KeeperException {
284    // For compatibility, mirror the meta region state to zookeeper
285    // And we still need to use zookeeper to publish the meta region locations to region
286    // server, so they can serve as ClientMetaService
287    ZKWatcher zk = master.getZooKeeper();
288    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
289      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
290      return;
291    }
292    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
293    for (RegionStateNode metaState : metaStates) {
294      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
295        metaState.getRegionInfo().getReplicaId(), metaState.getState());
296    }
297    int replicaCount = metaStates.size();
298    // remove extra mirror locations
299    for (String znode : zk.getMetaReplicaNodes()) {
300      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
301      if (replicaId >= replicaCount) {
302        MetaTableLocator.deleteMetaLocation(zk, replicaId);
303      }
304    }
305  }
306
307  public void start() throws IOException, KeeperException {
308    if (!running.compareAndSet(false, true)) {
309      return;
310    }
311
312    LOG.trace("Starting assignment manager");
313
314    // Start the Assignment Thread
315    startAssignmentThread();
316    // load meta region states.
317    // here we are still in the early steps of active master startup. There is only one thread(us)
318    // can access AssignmentManager and create region node, so here we do not need to lock the
319    // region node.
320    try (ResultScanner scanner =
321      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
322      for (;;) {
323        Result result = scanner.next();
324        if (result == null) {
325          break;
326        }
327        RegionStateStore
328          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
329            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
330            regionNode.setState(state);
331            regionNode.setLastHost(lastHost);
332            regionNode.setRegionLocation(regionLocation);
333            regionNode.setOpenSeqNum(openSeqNum);
334            if (regionNode.getProcedure() != null) {
335              regionNode.getProcedure().stateLoaded(this, regionNode);
336            }
337            if (regionLocation != null) {
338              // TODO: this could lead to some orphan server state nodes, as it is possible that the
339              // region server is already dead and its SCP has already finished but we have
340              // persisted an opening state on this region server. Finally the TRSP will assign the
341              // region to another region server, so it will not cause critical problems, just waste
342              // some memory as no one will try to cleanup these orphan server state nodes.
343              regionStates.createServer(regionLocation);
344              regionStates.addRegionToServer(regionNode);
345            }
346            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
347              setMetaAssigned(regionInfo, state == State.OPEN);
348            }
349            LOG.debug("Loaded hbase:meta {}", regionNode);
350          }, result);
351      }
352    }
353    mirrorMetaLocations();
354  }
355
356  /**
357   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
358   * <p>
359   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
360   * method below. And it is also very important as now before submitting a TRSP, we need to attach
361   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
362   * the very beginning, before we start processing any procedures.
363   */
364  public void setupRIT(List<TransitRegionStateProcedure> procs) {
365    procs.forEach(proc -> {
366      RegionInfo regionInfo = proc.getRegion();
367      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
368      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
369      if (existingProc != null) {
370        // This is possible, as we will detach the procedure from the RSN before we
371        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
372        // during execution, at that time, the procedure has not been marked as done in the pv2
373        // framework yet, so it is possible that we schedule a new TRSP immediately and when
374        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
375        // make sure that, only the last one can take the charge, the previous ones should have all
376        // been finished already. So here we will compare the proc id, the greater one will win.
377        if (existingProc.getProcId() < proc.getProcId()) {
378          // the new one wins, unset and set it to the new one below
379          regionNode.unsetProcedure(existingProc);
380        } else {
381          // the old one wins, skip
382          return;
383        }
384      }
385      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
386      regionNode.setProcedure(proc);
387    });
388  }
389
390  public void stop() {
391    if (!running.compareAndSet(true, false)) {
392      return;
393    }
394
395    LOG.info("Stopping assignment manager");
396
397    // The AM is started before the procedure executor,
398    // but the actual work will be loaded/submitted only once we have the executor
399    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
400
401    // Remove the RIT chore
402    if (hasProcExecutor) {
403      master.getMasterProcedureExecutor().removeChore(this.ritChore);
404      if (this.deadMetricChore != null) {
405        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
406      }
407    }
408
409    // Stop the Assignment Thread
410    stopAssignmentThread();
411
412    // Stop the RegionStateStore
413    regionStates.clear();
414
415    // Update meta events (for testing)
416    if (hasProcExecutor) {
417      metaLoadEvent.suspend();
418      for (RegionInfo hri : getMetaRegionSet()) {
419        setMetaAssigned(hri, false);
420      }
421    }
422  }
423
424  public boolean isRunning() {
425    return running.get();
426  }
427
428  public Configuration getConfiguration() {
429    return master.getConfiguration();
430  }
431
432  public MetricsAssignmentManager getAssignmentManagerMetrics() {
433    return metrics;
434  }
435
436  private LoadBalancer getBalancer() {
437    return master.getLoadBalancer();
438  }
439
440  private FavoredNodesPromoter getFavoredNodePromoter() {
441    return (FavoredNodesPromoter) ((RSGroupBasedLoadBalancer) master.getLoadBalancer())
442      .getInternalBalancer();
443  }
444
445  private MasterProcedureEnv getProcedureEnvironment() {
446    return master.getMasterProcedureExecutor().getEnvironment();
447  }
448
449  private MasterProcedureScheduler getProcedureScheduler() {
450    return getProcedureEnvironment().getProcedureScheduler();
451  }
452
453  int getAssignMaxAttempts() {
454    return assignMaxAttempts;
455  }
456
457  public boolean isForceRegionRetainment() {
458    return forceRegionRetainment;
459  }
460
461  public long getForceRegionRetainmentWaitInterval() {
462    return forceRegionRetainmentWaitInterval;
463  }
464
465  public int getForceRegionRetainmentRetries() {
466    return forceRegionRetainmentRetries;
467  }
468
469  int getAssignRetryImmediatelyMaxAttempts() {
470    return assignRetryImmediatelyMaxAttempts;
471  }
472
473  public RegionStates getRegionStates() {
474    return regionStates;
475  }
476
477  /**
478   * Returns the regions hosted by the specified server.
479   * <p/>
480   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
481   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
482   * snapshot of the current region list for this server, which means, right after you get the
483   * region list, new regions may be moved to this server or some regions may be moved out from this
484   * server, so you should not use it critically if you need strong consistency.
485   */
486  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
487    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
488    if (serverInfo == null) {
489      return Collections.emptyList();
490    }
491    return serverInfo.getRegionInfoList();
492  }
493
494  private RegionInfo getRegionInfo(RegionStateNode rsn) {
495    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
496      // see the comments in markRegionAsSplit on why we need to do this converting.
497      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
498        .build();
499    } else {
500      return rsn.getRegionInfo();
501    }
502  }
503
504  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
505    boolean excludeOfflinedSplitParents) {
506    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
507    if (excludeOfflinedSplitParents) {
508      return stream.filter(rsn -> !rsn.isSplit());
509    } else {
510      return stream;
511    }
512  }
513
514  public List<RegionInfo> getTableRegions(TableName tableName,
515    boolean excludeOfflinedSplitParents) {
516    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
517      .collect(Collectors.toList());
518  }
519
520  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
521    boolean excludeOfflinedSplitParents) {
522    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
523      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
524      .collect(Collectors.toList());
525  }
526
527  public RegionStateStore getRegionStateStore() {
528    return regionStateStore;
529  }
530
531  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
532    return this.shouldAssignRegionsWithFavoredNodes
533      ? getFavoredNodePromoter().getFavoredNodes(regionInfo)
534      : ServerName.EMPTY_SERVER_LIST;
535  }
536
537  // ============================================================================================
538  // Table State Manager helpers
539  // ============================================================================================
540  private TableStateManager getTableStateManager() {
541    return master.getTableStateManager();
542  }
543
544  private boolean isTableEnabled(final TableName tableName) {
545    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
546  }
547
548  private boolean isTableDisabled(final TableName tableName) {
549    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
550      TableState.State.DISABLING);
551  }
552
553  // ============================================================================================
554  // META Helpers
555  // ============================================================================================
556  private boolean isMetaRegion(final RegionInfo regionInfo) {
557    return regionInfo.isMetaRegion();
558  }
559
560  public boolean isMetaRegion(final byte[] regionName) {
561    return getMetaRegionFromName(regionName) != null;
562  }
563
564  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
565    for (RegionInfo hri : getMetaRegionSet()) {
566      if (Bytes.equals(hri.getRegionName(), regionName)) {
567        return hri;
568      }
569    }
570    return null;
571  }
572
573  public boolean isCarryingMeta(final ServerName serverName) {
574    // TODO: handle multiple meta
575    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
576  }
577
578  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
579    // TODO: check for state?
580    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
581    return (node != null && serverName.equals(node.getRegionLocation()));
582  }
583
584  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
585    // if (regionInfo.isMetaRegion()) return regionInfo;
586    // TODO: handle multiple meta. if the region provided is not meta lookup
587    // which meta the region belongs to.
588    return RegionInfoBuilder.FIRST_META_REGIONINFO;
589  }
590
591  // TODO: handle multiple meta.
592  private static final Set<RegionInfo> META_REGION_SET =
593    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
594
595  public Set<RegionInfo> getMetaRegionSet() {
596    return META_REGION_SET;
597  }
598
599  // ============================================================================================
600  // META Event(s) helpers
601  // ============================================================================================
602  /**
603   * Notice that, this only means the meta region is available on a RS, but the AM may still be
604   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
605   * before checking this method, unless you can make sure that your piece of code can only be
606   * executed after AM builds the region states.
607   * @see #isMetaLoaded()
608   */
609  public boolean isMetaAssigned() {
610    return metaAssignEvent.isReady();
611  }
612
613  public boolean isMetaRegionInTransition() {
614    return !isMetaAssigned();
615  }
616
617  /**
618   * Notice that this event does not mean the AM has already finished region state rebuilding. See
619   * the comment of {@link #isMetaAssigned()} for more details.
620   * @see #isMetaAssigned()
621   */
622  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
623    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
624  }
625
626  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
627    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
628    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
629    if (assigned) {
630      metaAssignEvent.wake(getProcedureScheduler());
631    } else {
632      metaAssignEvent.suspend();
633    }
634  }
635
636  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
637    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
638    // TODO: handle multiple meta.
639    return metaAssignEvent;
640  }
641
642  /**
643   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
644   * @see #isMetaLoaded()
645   * @see #waitMetaAssigned(Procedure, RegionInfo)
646   */
647  public boolean waitMetaLoaded(Procedure<?> proc) {
648    return metaLoadEvent.suspendIfNotReady(proc);
649  }
650
651  /**
652   * This method will be called in master initialization method after calling
653   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
654   * procedures for offline regions, which may be conflict with creating table.
655   * <p/>
656   * This is a bit dirty, should be reconsidered after we decide whether to keep the
657   * {@link #processOfflineRegions()} method.
658   */
659  public void wakeMetaLoadedEvent() {
660    metaLoadEvent.wake(getProcedureScheduler());
661    assert isMetaLoaded() : "expected meta to be loaded";
662  }
663
664  /**
665   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
666   * @see #isMetaAssigned()
667   * @see #waitMetaLoaded(Procedure)
668   */
669  public boolean isMetaLoaded() {
670    return metaLoadEvent.isReady();
671  }
672
673  /**
674   * Start a new thread to check if there are region servers whose versions are higher than others.
675   * If so, move all system table regions to RS with the highest version to keep compatibility. The
676   * reason is, RS in new version may not be able to access RS in old version when there are some
677   * incompatible changes.
678   * <p>
679   * This method is called when a new RegionServer is added to cluster only.
680   * </p>
681   */
682  public void checkIfShouldMoveSystemRegionAsync() {
683    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
684    // it should 'move' the system tables from the old server to the new server but
685    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
686    if (this.master.getServerManager().countOfRegionServers() <= 1) {
687      return;
688    }
689    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
690    // childrenChanged notification came in before the nodeDeleted message and so this method
691    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
692    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
693    // crashed server and go move them before ServerCrashProcedure had a chance; could be
694    // dataloss too if WALs were not recovered.
695    new Thread(() -> {
696      try {
697        synchronized (checkIfShouldMoveSystemRegionLock) {
698          List<RegionPlan> plans = new ArrayList<>();
699          // TODO: I don't think this code does a good job if all servers in cluster have same
700          // version. It looks like it will schedule unnecessary moves.
701          for (ServerName server : getExcludedServersForSystemTable()) {
702            if (master.getServerManager().isServerDead(server)) {
703              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
704              // considers only online servers, the server could be queued for dead server
705              // processing. As region assignments for crashed server is handled by
706              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
707              // regular flow of LoadBalancer as a favored node and not to have this special
708              // handling.
709              continue;
710            }
711            List<RegionInfo> regionsShouldMove = getSystemTables(server);
712            if (!regionsShouldMove.isEmpty()) {
713              for (RegionInfo regionInfo : regionsShouldMove) {
714                // null value for dest forces destination server to be selected by balancer
715                RegionPlan plan = new RegionPlan(regionInfo, server, null);
716                if (regionInfo.isMetaRegion()) {
717                  // Must move meta region first.
718                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
719                    server);
720                  moveAsync(plan);
721                } else {
722                  plans.add(plan);
723                }
724              }
725            }
726            for (RegionPlan plan : plans) {
727              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
728                server);
729              moveAsync(plan);
730            }
731          }
732        }
733      } catch (Throwable t) {
734        LOG.error(t.toString(), t);
735      }
736    }).start();
737  }
738
739  private List<RegionInfo> getSystemTables(ServerName serverName) {
740    ServerStateNode serverNode = regionStates.getServerNode(serverName);
741    if (serverNode == null) {
742      return Collections.emptyList();
743    }
744    return serverNode.getSystemRegionInfoList();
745  }
746
747  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
748    throws HBaseIOException {
749    if (regionNode.getProcedure() != null) {
750      throw new HBaseIOException(
751        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
752    }
753    if (!regionNode.isInState(expectedStates)) {
754      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
755    }
756    if (isTableDisabled(regionNode.getTable())) {
757      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
758    }
759  }
760
761  /**
762   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
763   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
764   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
765   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
766   *      used when only when no checking needed.
767   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
768   */
769  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
770    boolean override, boolean force) throws IOException {
771    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
772    regionNode.lock();
773    try {
774      if (override) {
775        if (!force) {
776          preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
777        }
778        if (regionNode.getProcedure() != null) {
779          regionNode.unsetProcedure(regionNode.getProcedure());
780        }
781      } else {
782        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
783      }
784      assert regionNode.getProcedure() == null;
785      return regionNode.setProcedure(
786        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
787    } finally {
788      regionNode.unlock();
789    }
790  }
791
792  /**
793   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
794   * appriopriate state ripe for assign.
795   * @see #createAssignProcedure(RegionInfo, ServerName, boolean, boolean)
796   */
797  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
798    ServerName targetServer) {
799    regionNode.lock();
800    try {
801      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
802        regionNode.getRegionInfo(), targetServer));
803    } finally {
804      regionNode.unlock();
805    }
806  }
807
808  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
809    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false, false);
810    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
811    return proc.getProcId();
812  }
813
814  public long assign(RegionInfo regionInfo) throws IOException {
815    return assign(regionInfo, null);
816  }
817
818  /**
819   * Submits a procedure that assigns a region to a target server without waiting for it to finish
820   * @param regionInfo the region we would like to assign
821   * @param sn         target server name
822   */
823  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
824    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
825      createAssignProcedure(regionInfo, sn, false, false));
826  }
827
828  /**
829   * Submits a procedure that assigns a region without waiting for it to finish
830   * @param regionInfo the region we would like to assign
831   */
832  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
833    return assignAsync(regionInfo, null);
834  }
835
836  public long unassign(RegionInfo regionInfo) throws IOException {
837    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
838    if (regionNode == null) {
839      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
840    }
841    TransitRegionStateProcedure proc;
842    regionNode.lock();
843    try {
844      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
845      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
846      regionNode.setProcedure(proc);
847    } finally {
848      regionNode.unlock();
849    }
850    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
851    return proc.getProcId();
852  }
853
854  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
855    ServerName targetServer) throws HBaseIOException {
856    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
857    if (regionNode == null) {
858      throw new UnknownRegionException(
859        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
860    }
861    TransitRegionStateProcedure proc;
862    regionNode.lock();
863    try {
864      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
865      regionNode.checkOnline();
866      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
867      regionNode.setProcedure(proc);
868    } finally {
869      regionNode.unlock();
870    }
871    return proc;
872  }
873
874  public void move(RegionInfo regionInfo) throws IOException {
875    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
876    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
877  }
878
879  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
880    TransitRegionStateProcedure proc =
881      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
882    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
883  }
884
885  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
886    ServerName current =
887      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
888    if (current == null || !current.equals(regionPlan.getSource())) {
889      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
890        regionPlan, current == null ? "(null)" : current);
891      return null;
892    }
893    return moveAsync(regionPlan);
894  }
895
896  // ============================================================================================
897  // RegionTransition procedures helpers
898  // ============================================================================================
899
900  /**
901   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
902   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
903   *         to populate the assigns with targets chosen using round-robin (default balancer
904   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
905   *         AssignProcedure will ask the balancer for a new target, and so on.
906   */
907  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
908    List<ServerName> serversToExclude) {
909    if (hris.isEmpty()) {
910      return new TransitRegionStateProcedure[0];
911    }
912
913    if (
914      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
915    ) {
916      LOG.debug("Only one region server found and hence going ahead with the assignment");
917      serversToExclude = null;
918    }
919    try {
920      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
921      // a better job if it has all the assignments in the one lump.
922      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
923        this.master.getServerManager().createDestinationServersList(serversToExclude));
924      // Return mid-method!
925      return createAssignProcedures(assignments);
926    } catch (IOException hioe) {
927      LOG.warn("Failed roundRobinAssignment", hioe);
928    }
929    // If an error above, fall-through to this simpler assign. Last resort.
930    return createAssignProcedures(hris);
931  }
932
933  /**
934   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
935   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
936   *         to populate the assigns with targets chosen using round-robin (default balancer
937   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
938   *         AssignProcedure will ask the balancer for a new target, and so on.
939   */
940  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
941    return createRoundRobinAssignProcedures(hris, null);
942  }
943
944  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
945    if (left.getRegion().isMetaRegion()) {
946      if (right.getRegion().isMetaRegion()) {
947        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
948      }
949      return -1;
950    } else if (right.getRegion().isMetaRegion()) {
951      return +1;
952    }
953    if (left.getRegion().getTable().isSystemTable()) {
954      if (right.getRegion().getTable().isSystemTable()) {
955        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
956      }
957      return -1;
958    } else if (right.getRegion().getTable().isSystemTable()) {
959      return +1;
960    }
961    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
962  }
963
964  /**
965   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
966   * method is called from HBCK2.
967   * @return an assign or null
968   */
969  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override,
970    boolean force) {
971    TransitRegionStateProcedure trsp = null;
972    try {
973      trsp = createAssignProcedure(ri, null, override, force);
974    } catch (IOException ioe) {
975      LOG.info(
976        "Failed {} assign, override={}"
977          + (override ? "" : "; set override to by-pass state checks."),
978        ri.getEncodedName(), override, ioe);
979    }
980    return trsp;
981  }
982
983  /**
984   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
985   * @return an unassign or null
986   */
987  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override,
988    boolean force) {
989    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
990    TransitRegionStateProcedure trsp = null;
991    regionNode.lock();
992    try {
993      if (override) {
994        if (!force) {
995          preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
996        }
997        if (regionNode.getProcedure() != null) {
998          regionNode.unsetProcedure(regionNode.getProcedure());
999        }
1000      } else {
1001        // This is where we could throw an exception; i.e. override is false.
1002        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
1003      }
1004      assert regionNode.getProcedure() == null;
1005      trsp =
1006        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
1007      regionNode.setProcedure(trsp);
1008    } catch (IOException ioe) {
1009      // 'override' must be false here.
1010      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
1011        ri.getEncodedName(), ioe);
1012    } finally {
1013      regionNode.unlock();
1014    }
1015    return trsp;
1016  }
1017
1018  /**
1019   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
1020   * of caller is unable to do {@link #createAssignProcedures(Map)}.
1021   * <p/>
1022   * If no target server, at assign time, we will try to use the former location of the region if
1023   * one exists. This is how we 'retain' the old location across a server restart.
1024   * <p/>
1025   * Should only be called when you can make sure that no one can touch these regions other than
1026   * you. For example, when you are creating or enabling table. Presumes all Regions are in
1027   * appropriate state ripe for assign; no checking of Region state is done in here.
1028   * @see #createAssignProcedures(Map)
1029   */
1030  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
1031    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1032      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
1033      .toArray(TransitRegionStateProcedure[]::new);
1034  }
1035
1036  /**
1037   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
1038   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
1039   * Region state is done in here.
1040   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
1041   * @return Assignments made from the passed in <code>assignments</code>
1042   * @see #createAssignProcedures(List)
1043   */
1044  private TransitRegionStateProcedure[]
1045    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
1046    return assignments.entrySet().stream()
1047      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1048        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
1049      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
1050  }
1051
1052  // for creating unassign TRSP when disabling a table or closing excess region replicas
1053  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
1054    regionNode.lock();
1055    try {
1056      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1057        return null;
1058      }
1059      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
1060      // here for safety
1061      if (regionNode.getRegionInfo().isSplit()) {
1062        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1063        return null;
1064      }
1065      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1066      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1067      // shared lock for table all the time. So here we will unset it and when it is actually
1068      // executed, it will find that the attach procedure is not itself and quit immediately.
1069      if (regionNode.getProcedure() != null) {
1070        regionNode.unsetProcedure(regionNode.getProcedure());
1071      }
1072      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1073        regionNode.getRegionInfo()));
1074    } finally {
1075      regionNode.unlock();
1076    }
1077  }
1078
1079  /**
1080   * Called by DisableTableProcedure to unassign all the regions for a table.
1081   */
1082  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1083    return regionStates.getTableRegionStateNodes(tableName).stream()
1084      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1085      .toArray(TransitRegionStateProcedure[]::new);
1086  }
1087
1088  private int submitUnassignProcedure(TableName tableName,
1089    Function<RegionStateNode, Boolean> shouldSubmit, Consumer<RegionStateNode> logRIT,
1090    Consumer<TransitRegionStateProcedure> submit) {
1091    int inTransitionCount = 0;
1092    for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1093      regionNode.lock();
1094      try {
1095        if (shouldSubmit.apply(regionNode)) {
1096          if (regionNode.isInTransition()) {
1097            logRIT.accept(regionNode);
1098            inTransitionCount++;
1099            continue;
1100          }
1101          if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1102            continue;
1103          }
1104          submit.accept(regionNode.setProcedure(TransitRegionStateProcedure
1105            .unassign(getProcedureEnvironment(), regionNode.getRegionInfo())));
1106        }
1107      } finally {
1108        regionNode.unlock();
1109      }
1110    }
1111    return inTransitionCount;
1112  }
1113
1114  /**
1115   * Called by DsiableTableProcedure to unassign all regions for a table. Will skip submit unassign
1116   * procedure if the region is in transition, so you may need to call this method multiple times.
1117   * @param tableName the table for closing excess region replicas
1118   * @param submit    for submitting procedure
1119   * @return the number of regions in transition that we can not schedule unassign procedures
1120   */
1121  public int submitUnassignProcedureForDisablingTable(TableName tableName,
1122    Consumer<TransitRegionStateProcedure> submit) {
1123    return submitUnassignProcedure(tableName, rn -> true,
1124      rn -> LOG.debug("skip scheduling unassign procedure for {} when closing table regions "
1125        + "for disabling since it is in transition", rn),
1126      submit);
1127  }
1128
1129  /**
1130   * Called by ModifyTableProcedure to unassign all the excess region replicas for a table. Will
1131   * skip submit unassign procedure if the region is in transition, so you may need to call this
1132   * method multiple times.
1133   * @param tableName       the table for closing excess region replicas
1134   * @param newReplicaCount the new replica count, should be less than current replica count
1135   * @param submit          for submitting procedure
1136   * @return the number of regions in transition that we can not schedule unassign procedures
1137   */
1138  public int submitUnassignProcedureForClosingExcessRegionReplicas(TableName tableName,
1139    int newReplicaCount, Consumer<TransitRegionStateProcedure> submit) {
1140    return submitUnassignProcedure(tableName,
1141      rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount,
1142      rn -> LOG.debug("skip scheduling unassign procedure for {} when closing excess region "
1143        + "replicas since it is in transition", rn),
1144      submit);
1145  }
1146
1147  private int numberOfUnclosedRegions(TableName tableName,
1148    Function<RegionStateNode, Boolean> shouldSubmit) {
1149    int unclosed = 0;
1150    for (RegionStateNode regionNode : regionStates.getTableRegionStateNodes(tableName)) {
1151      regionNode.lock();
1152      try {
1153        if (shouldSubmit.apply(regionNode)) {
1154          if (!regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1155            unclosed++;
1156          }
1157        }
1158      } finally {
1159        regionNode.unlock();
1160      }
1161    }
1162    return unclosed;
1163  }
1164
1165  public int numberOfUnclosedRegionsForDisabling(TableName tableName) {
1166    return numberOfUnclosedRegions(tableName, rn -> true);
1167  }
1168
1169  public int numberOfUnclosedExcessRegionReplicas(TableName tableName, int newReplicaCount) {
1170    return numberOfUnclosedRegions(tableName,
1171      rn -> rn.getRegionInfo().getReplicaId() >= newReplicaCount);
1172  }
1173
1174  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1175    final byte[] splitKey) throws IOException {
1176    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1177  }
1178
1179  public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate)
1180    throws IOException {
1181    return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate);
1182  }
1183
1184  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1185    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1186  }
1187
1188  /**
1189   * Delete the region states. This is called by "DeleteTable"
1190   */
1191  public void deleteTable(final TableName tableName) throws IOException {
1192    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1193    regionStateStore.deleteRegions(regions);
1194    for (int i = 0; i < regions.size(); ++i) {
1195      final RegionInfo regionInfo = regions.get(i);
1196      regionStates.deleteRegion(regionInfo);
1197    }
1198  }
1199
1200  // ============================================================================================
1201  // RS Region Transition Report helpers
1202  // ============================================================================================
1203  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1204    ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException {
1205    for (RegionStateTransition transition : transitionList) {
1206      switch (transition.getTransitionCode()) {
1207        case OPENED:
1208        case FAILED_OPEN:
1209        case CLOSED:
1210          assert transition.getRegionInfoCount() == 1 : transition;
1211          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1212          long procId =
1213            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1214          updateRegionTransition(serverNode, transition.getTransitionCode(), hri,
1215            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1216          break;
1217        case READY_TO_SPLIT:
1218        case SPLIT:
1219        case SPLIT_REVERTED:
1220          assert transition.getRegionInfoCount() == 3 : transition;
1221          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1222          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1223          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1224          updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA,
1225            splitB);
1226          break;
1227        case READY_TO_MERGE:
1228        case MERGED:
1229        case MERGE_REVERTED:
1230          assert transition.getRegionInfoCount() == 3 : transition;
1231          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1232          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1233          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1234          updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA,
1235            mergeB);
1236          break;
1237      }
1238    }
1239  }
1240
1241  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1242    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1243    ReportRegionStateTransitionResponse.Builder builder =
1244      ReportRegionStateTransitionResponse.newBuilder();
1245    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1246    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1247    if (serverNode == null) {
1248      LOG.warn("No server node for {}", serverName);
1249      builder.setErrorMessage("No server node for " + serverName);
1250      return builder.build();
1251    }
1252    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1253    // we should not block other reportRegionStateTransition call from the same region server. This
1254    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1255    // also on the same region server and you hold the lock which blocks the
1256    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1257    // lock protection to wait for meta online...
1258    serverNode.readLock().lock();
1259    try {
1260      // we only accept reportRegionStateTransition if the region server is online, see the comment
1261      // above in submitServerCrash method and HBASE-21508 for more details.
1262      if (serverNode.isInState(ServerState.ONLINE)) {
1263        try {
1264          reportRegionStateTransition(builder, serverNode, req.getTransitionList());
1265        } catch (PleaseHoldException e) {
1266          LOG.trace("Failed transition ", e);
1267          throw e;
1268        } catch (UnsupportedOperationException | IOException e) {
1269          // TODO: at the moment we have a single error message and the RS will abort
1270          // if the master says that one of the region transitions failed.
1271          LOG.warn("Failed transition", e);
1272          builder.setErrorMessage("Failed transition " + e.getMessage());
1273        }
1274      } else {
1275        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1276          serverName);
1277        builder.setErrorMessage("You are dead");
1278      }
1279    } finally {
1280      serverNode.readLock().unlock();
1281    }
1282
1283    return builder.build();
1284  }
1285
1286  private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state,
1287    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1288    checkMetaLoaded(regionInfo);
1289
1290    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1291    if (regionNode == null) {
1292      // the table/region is gone. maybe a delete, split, merge
1293      throw new UnexpectedStateException(String.format(
1294        "Server %s was trying to transition region %s to %s. but Region is not known.",
1295        serverNode.getServerName(), regionInfo, state));
1296    }
1297    LOG.trace("Update region transition serverName={} region={} regionState={}",
1298      serverNode.getServerName(), regionNode, state);
1299
1300    regionNode.lock();
1301    try {
1302      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1303        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1304        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1305        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1306        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1307        // to CLOSED
1308        // These happen because on cluster shutdown, we currently let the RegionServers close
1309        // regions. This is the only time that region close is not run by the Master (so cluster
1310        // goes down fast). Consider changing it so Master runs all shutdowns.
1311        if (
1312          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1313        ) {
1314          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1315        } else {
1316          LOG.warn("No matching procedure found for {} transition on {} to {}",
1317            serverNode.getServerName(), regionNode, state);
1318        }
1319      }
1320    } finally {
1321      regionNode.unlock();
1322    }
1323  }
1324
1325  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1326    TransitionCode state, long seqId, long procId) throws IOException {
1327    ServerName serverName = serverNode.getServerName();
1328    TransitRegionStateProcedure proc = regionNode.getProcedure();
1329    if (proc == null) {
1330      return false;
1331    }
1332    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1333      serverName, state, seqId, procId);
1334    return true;
1335  }
1336
1337  private void updateRegionSplitTransition(final ServerStateNode serverNode,
1338    final TransitionCode state, final RegionInfo parent, final RegionInfo hriA,
1339    final RegionInfo hriB) throws IOException {
1340    checkMetaLoaded(parent);
1341
1342    if (state != TransitionCode.READY_TO_SPLIT) {
1343      throw new UnexpectedStateException(
1344        "unsupported split regionState=" + state + " for parent region " + parent
1345          + " maybe an old RS (< 2.0) had the operation in progress");
1346    }
1347
1348    // sanity check on the request
1349    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1350      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1351        + parent + " hriA=" + hriA + " hriB=" + hriB);
1352    }
1353
1354    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1355      LOG.warn("Split switch is off! skip split of " + parent);
1356      throw new DoNotRetryIOException(
1357        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1358    }
1359
1360    // Submit the Split procedure
1361    final byte[] splitKey = hriB.getStartKey();
1362    if (LOG.isDebugEnabled()) {
1363      LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent,
1364        Bytes.toStringBinary(splitKey));
1365    }
1366    // Processing this report happens asynchronously from other activities which can mutate
1367    // the region state. For example, a split procedure may already be running for this parent.
1368    // A split procedure cannot succeed if the parent region is no longer open, so we can
1369    // ignore it in that case.
1370    // Note that submitting more than one split procedure for a given region is
1371    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1372    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1373    // initialization and report failure with WARN level logging.
1374    RegionState parentState = regionStates.getRegionState(parent);
1375    if (parentState != null && parentState.isOpened()) {
1376      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1377    } else {
1378      LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open",
1379        serverNode.getServerName(), parent);
1380      return;
1381    }
1382
1383    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1384    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1385      throw new UnsupportedOperationException(
1386        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1387          parent.getShortNameToLog(), hriA, hriB));
1388    }
1389  }
1390
1391  private void updateRegionMergeTransition(final ServerStateNode serverNode,
1392    final TransitionCode state, final RegionInfo merged, final RegionInfo hriA,
1393    final RegionInfo hriB) throws IOException {
1394    checkMetaLoaded(merged);
1395
1396    if (state != TransitionCode.READY_TO_MERGE) {
1397      throw new UnexpectedStateException(
1398        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1399          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1400    }
1401
1402    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1403      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1404      throw new DoNotRetryIOException(
1405        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1406    }
1407
1408    // Submit the Merge procedure
1409    if (LOG.isDebugEnabled()) {
1410      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1411    }
1412    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1413
1414    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1415    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1416      throw new UnsupportedOperationException(
1417        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1418          merged, hriA, hriB));
1419    }
1420  }
1421
1422  // ============================================================================================
1423  // RS Status update (report online regions) helpers
1424  // ============================================================================================
1425  /**
1426   * The master will call this method when the RS send the regionServerReport(). The report will
1427   * contains the "online regions". This method will check the the online regions against the
1428   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1429   * because that there is no fencing between the reportRegionStateTransition method and
1430   * regionServerReport method, so there could be race and introduce inconsistency here, but
1431   * actually there is no problem.
1432   * <p/>
1433   * Please see HBASE-21421 and HBASE-21463 for more details.
1434   */
1435  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1436    if (!isRunning()) {
1437      return;
1438    }
1439    if (LOG.isTraceEnabled()) {
1440      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1441        regionNames.size(), isMetaLoaded(),
1442        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1443    }
1444
1445    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1446    if (serverNode == null) {
1447      LOG.warn("Got a report from server {} where its server node is null", serverName);
1448      return;
1449    }
1450    serverNode.readLock().lock();
1451    try {
1452      if (!serverNode.isInState(ServerState.ONLINE)) {
1453        LOG.warn("Got a report from a server result in state {}", serverNode);
1454        return;
1455      }
1456    } finally {
1457      serverNode.readLock().unlock();
1458    }
1459
1460    // Track the regionserver reported online regions in memory.
1461    synchronized (rsReports) {
1462      rsReports.put(serverName, regionNames);
1463    }
1464
1465    if (regionNames.isEmpty()) {
1466      // nothing to do if we don't have regions
1467      LOG.trace("no online region found on {}", serverName);
1468      return;
1469    }
1470    if (!isMetaLoaded()) {
1471      // we are still on startup, skip checking
1472      return;
1473    }
1474    // The Heartbeat tells us of what regions are on the region serve, check the state.
1475    checkOnlineRegionsReport(serverNode, regionNames);
1476  }
1477
1478  /**
1479   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1480   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1481   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1482   * accounting.
1483   */
1484  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1485    try {
1486      RegionInfo ri = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
1487      // Pass -1 for timeout. Means do not wait.
1488      ServerManager.closeRegionSilentlyAndWait(this.master.getAsyncClusterConnection(), sn, ri, -1);
1489    } catch (Exception e) {
1490      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1491    }
1492  }
1493
1494  /**
1495   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1496   * will tell the RegionServer to expediently close a Region we do not think it should have.
1497   */
1498  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1499    ServerName serverName = serverNode.getServerName();
1500    for (byte[] regionName : regionNames) {
1501      if (!isRunning()) {
1502        return;
1503      }
1504      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1505      if (regionNode == null) {
1506        String regionNameAsStr = Bytes.toStringBinary(regionName);
1507        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1508          serverName);
1509        closeRegionSilently(serverNode.getServerName(), regionName);
1510        continue;
1511      }
1512      final long lag = 1000;
1513      // This is just a fallback check designed to identify unexpected data inconsistencies, so we
1514      // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the
1515      // check. This will not cause any additional problems and also prevents the regionServerReport
1516      // call from being stuck for too long which may cause deadlock on region assignment.
1517      if (regionNode.tryLock()) {
1518        try {
1519          long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1520          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1521            // This is possible as a region server has just closed a region but the region server
1522            // report is generated before the closing, but arrive after the closing. Make sure
1523            // there
1524            // is some elapsed time so less false alarms.
1525            if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1526              LOG.warn("Reporting {} server does not match {} (time since last "
1527                + "update={}ms); closing...", serverName, regionNode, diff);
1528              closeRegionSilently(serverNode.getServerName(), regionName);
1529            }
1530          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1531            // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1532            // came in at about same time as a region transition. Make sure there is some
1533            // elapsed time so less false alarms.
1534            if (diff > lag) {
1535              LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1536                serverName, regionNode, diff);
1537            }
1538          }
1539        } finally {
1540          regionNode.unlock();
1541        }
1542      } else {
1543        LOG.warn(
1544          "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.",
1545          regionNode);
1546      }
1547    }
1548  }
1549
1550  // ============================================================================================
1551  // RIT chore
1552  // ============================================================================================
1553  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1554    public RegionInTransitionChore(final int timeoutMsec) {
1555      super(timeoutMsec);
1556    }
1557
1558    @Override
1559    protected void periodicExecute(final MasterProcedureEnv env) {
1560      final AssignmentManager am = env.getAssignmentManager();
1561
1562      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1563      if (ritStat.hasRegionsOverThreshold()) {
1564        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1565          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1566        }
1567      }
1568
1569      // update metrics
1570      am.updateRegionsInTransitionMetrics(ritStat);
1571    }
1572  }
1573
1574  private static class DeadServerMetricRegionChore
1575    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1576    public DeadServerMetricRegionChore(final int timeoutMsec) {
1577      super(timeoutMsec);
1578    }
1579
1580    @Override
1581    protected void periodicExecute(final MasterProcedureEnv env) {
1582      final ServerManager sm = env.getMasterServices().getServerManager();
1583      final AssignmentManager am = env.getAssignmentManager();
1584      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1585      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1586      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1587      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1588      // we miss some recently-dead server, we'll just see it next time.
1589      Set<ServerName> recentlyLiveServers = new HashSet<>();
1590      int deadRegions = 0, unknownRegions = 0;
1591      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1592        if (rsn.getState() != State.OPEN) {
1593          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1594        }
1595        // Do not need to acquire region state lock as this is only for showing metrics.
1596        ServerName sn = rsn.getRegionLocation();
1597        State state = rsn.getState();
1598        if (state != State.OPEN) {
1599          continue; // Mostly skipping RITs that are already being take care of.
1600        }
1601        if (sn == null) {
1602          ++unknownRegions; // Opened on null?
1603          continue;
1604        }
1605        if (recentlyLiveServers.contains(sn)) {
1606          continue;
1607        }
1608        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1609        switch (sls) {
1610          case LIVE:
1611            recentlyLiveServers.add(sn);
1612            break;
1613          case DEAD:
1614            ++deadRegions;
1615            break;
1616          case UNKNOWN:
1617            ++unknownRegions;
1618            break;
1619          default:
1620            throw new AssertionError("Unexpected " + sls);
1621        }
1622      }
1623      if (deadRegions > 0 || unknownRegions > 0) {
1624        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1625          deadRegions, unknownRegions);
1626      }
1627
1628      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1629    }
1630  }
1631
1632  public RegionInTransitionStat computeRegionInTransitionStat() {
1633    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1634    rit.update(this);
1635    return rit;
1636  }
1637
1638  public static class RegionInTransitionStat {
1639    private final int ritThreshold;
1640
1641    private HashMap<String, RegionState> ritsOverThreshold = null;
1642    private long statTimestamp;
1643    private long oldestRITTime = 0;
1644    private int totalRITsTwiceThreshold = 0;
1645    private int totalRITs = 0;
1646
1647    public RegionInTransitionStat(final Configuration conf) {
1648      this.ritThreshold =
1649        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1650    }
1651
1652    public int getRITThreshold() {
1653      return ritThreshold;
1654    }
1655
1656    public long getTimestamp() {
1657      return statTimestamp;
1658    }
1659
1660    public int getTotalRITs() {
1661      return totalRITs;
1662    }
1663
1664    public long getOldestRITTime() {
1665      return oldestRITTime;
1666    }
1667
1668    public int getTotalRITsOverThreshold() {
1669      Map<String, RegionState> m = this.ritsOverThreshold;
1670      return m != null ? m.size() : 0;
1671    }
1672
1673    public boolean hasRegionsTwiceOverThreshold() {
1674      return totalRITsTwiceThreshold > 0;
1675    }
1676
1677    public boolean hasRegionsOverThreshold() {
1678      Map<String, RegionState> m = this.ritsOverThreshold;
1679      return m != null && !m.isEmpty();
1680    }
1681
1682    public Collection<RegionState> getRegionOverThreshold() {
1683      Map<String, RegionState> m = this.ritsOverThreshold;
1684      return m != null ? m.values() : Collections.emptySet();
1685    }
1686
1687    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1688      Map<String, RegionState> m = this.ritsOverThreshold;
1689      return m != null && m.containsKey(regionInfo.getEncodedName());
1690    }
1691
1692    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1693      Map<String, RegionState> m = this.ritsOverThreshold;
1694      if (m == null) {
1695        return false;
1696      }
1697      final RegionState state = m.get(regionInfo.getEncodedName());
1698      if (state == null) {
1699        return false;
1700      }
1701      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1702    }
1703
1704    protected void update(final AssignmentManager am) {
1705      final RegionStates regionStates = am.getRegionStates();
1706      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1707      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1708      update(regionStates.getRegionFailedOpen(), statTimestamp);
1709
1710      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1711        LOG.debug("RITs over threshold: {}",
1712          ritsOverThreshold.entrySet().stream()
1713            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1714            .collect(Collectors.joining("\n")));
1715      }
1716    }
1717
1718    private void update(final Collection<RegionState> regions, final long currentTime) {
1719      for (RegionState state : regions) {
1720        totalRITs++;
1721        final long ritStartedMs = state.getStamp();
1722        if (ritStartedMs == 0) {
1723          // Don't output bogus values to metrics if they accidentally make it here.
1724          LOG.warn("The RIT {} has no start time", state.getRegion());
1725          continue;
1726        }
1727        final long ritTime = currentTime - ritStartedMs;
1728        if (ritTime > ritThreshold) {
1729          if (ritsOverThreshold == null) {
1730            ritsOverThreshold = new HashMap<String, RegionState>();
1731          }
1732          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1733          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1734        }
1735        if (oldestRITTime < ritTime) {
1736          oldestRITTime = ritTime;
1737        }
1738      }
1739    }
1740  }
1741
1742  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1743    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1744    metrics.updateRITCount(ritStat.getTotalRITs());
1745    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1746  }
1747
1748  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1749    metrics.updateDeadServerOpenRegions(deadRegions);
1750    metrics.updateUnknownServerOpenRegions(unknownRegions);
1751  }
1752
1753  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1754    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1755    // if (regionNode.isStuck()) {
1756    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1757  }
1758
1759  // ============================================================================================
1760  // TODO: Master load/bootstrap
1761  // ============================================================================================
1762  public void joinCluster() throws IOException {
1763    long startTime = System.nanoTime();
1764    LOG.debug("Joining cluster...");
1765
1766    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1767    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1768    // w/o meta.
1769    loadMeta();
1770
1771    while (master.getServerManager().countOfRegionServers() < 1) {
1772      LOG.info("Waiting for RegionServers to join; current count={}",
1773        master.getServerManager().countOfRegionServers());
1774      Threads.sleep(250);
1775    }
1776    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1777
1778    // Start the chores
1779    master.getMasterProcedureExecutor().addChore(this.ritChore);
1780    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1781
1782    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1783    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1784  }
1785
1786  /**
1787   * Create assign procedure for offline regions. Just follow the old
1788   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1789   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1790   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1791   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1792   * a legacy from long ago, as things are going really different now, maybe we do not need this
1793   * method any more. Need to revisit later.
1794   */
1795  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1796  // Needs to be done after the table state manager has been started.
1797  public void processOfflineRegions() {
1798    TransitRegionStateProcedure[] procs =
1799      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1800        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1801          rsn.lock();
1802          try {
1803            if (rsn.getProcedure() != null) {
1804              return null;
1805            } else {
1806              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1807                rsn.getRegionInfo(), null));
1808            }
1809          } finally {
1810            rsn.unlock();
1811          }
1812        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1813    if (procs.length > 0) {
1814      master.getMasterProcedureExecutor().submitProcedures(procs);
1815    }
1816  }
1817
1818  /*
1819   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1820   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1821   * convert Result into proper RegionInfo instances, but those would still need to be added into
1822   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1823   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1824   * AssignmentManager.regionStates.
1825   */
1826  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1827
1828    @Override
1829    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1830      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1831      if (
1832        state == null && regionLocation == null && lastHost == null
1833          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1834      ) {
1835        // This is a row with nothing in it.
1836        LOG.warn("Skipping empty row={}", result);
1837        return;
1838      }
1839      State localState = state;
1840      if (localState == null) {
1841        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1842        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1843        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1844        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1845        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1846        localState = State.OFFLINE;
1847      }
1848      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1849      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1850      // meta, all the related procedures can not be executed. The only exception is for meta
1851      // region related operations, but here we do not load the informations for meta region.
1852      regionNode.setState(localState);
1853      regionNode.setLastHost(lastHost);
1854      regionNode.setRegionLocation(regionLocation);
1855      regionNode.setOpenSeqNum(openSeqNum);
1856
1857      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1858      // RIT/ServerCrash handling should take care of the transiting regions.
1859      if (
1860        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1861      ) {
1862        assert regionLocation != null : "found null region location for " + regionNode;
1863        // TODO: this could lead to some orphan server state nodes, as it is possible that the
1864        // region server is already dead and its SCP has already finished but we have
1865        // persisted an opening state on this region server. Finally the TRSP will assign the
1866        // region to another region server, so it will not cause critical problems, just waste
1867        // some memory as no one will try to cleanup these orphan server state nodes.
1868        regionStates.createServer(regionLocation);
1869        regionStates.addRegionToServer(regionNode);
1870      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1871        regionStates.addToOfflineRegions(regionNode);
1872      }
1873      if (regionNode.getProcedure() != null) {
1874        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1875      }
1876    }
1877  };
1878
1879  /**
1880   * Attempt to load {@code regionInfo} from META, adding any results to the
1881   * {@link #regionStateStore} Is NOT aware of replica regions.
1882   * @param regionInfo the region to be loaded from META.
1883   * @throws IOException If some error occurs while querying META or parsing results.
1884   */
1885  public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo)
1886    throws IOException {
1887    final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId()
1888      ? regionInfo.getEncodedName()
1889      : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build()
1890        .getEncodedName();
1891    populateRegionStatesFromMeta(regionEncodedName);
1892  }
1893
1894  /**
1895   * Attempt to load {@code regionEncodedName} from META, adding any results to the
1896   * {@link #regionStateStore} Is NOT aware of replica regions.
1897   * @param regionEncodedName encoded name for the region to be loaded from META.
1898   * @throws IOException If some error occurs while querying META or parsing results.
1899   */
1900  public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException {
1901    final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1902    regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1903  }
1904
1905  private void loadMeta() throws IOException {
1906    // TODO: use a thread pool
1907    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1908  }
1909
1910  /**
1911   * Used to check if the meta loading is done.
1912   * <p/>
1913   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1914   * @param hri region to check if it is already rebuild
1915   * @throws PleaseHoldException if meta has not been loaded yet
1916   */
1917  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1918    if (!isRunning()) {
1919      throw new PleaseHoldException("AssignmentManager not running");
1920    }
1921    boolean meta = isMetaRegion(hri);
1922    boolean metaLoaded = isMetaLoaded();
1923    if (!meta && !metaLoaded) {
1924      throw new PleaseHoldException(
1925        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1926    }
1927  }
1928
1929  // ============================================================================================
1930  // TODO: Metrics
1931  // ============================================================================================
1932  public int getNumRegionsOpened() {
1933    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1934    return 0;
1935  }
1936
1937  /**
1938   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1939   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1940   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1941   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1942   * Servers' -- servers that are not online or in dead servers list, etc.)
1943   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1944   *              SCP even if it seems as though there might be an outstanding SCP running.
1945   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1946   */
1947  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1948    // May be an 'Unknown Server' so handle case where serverNode is null.
1949    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1950    // Remove the in-memory rsReports result
1951    synchronized (rsReports) {
1952      rsReports.remove(serverName);
1953    }
1954    if (serverNode == null) {
1955      if (force) {
1956        LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName);
1957      } else {
1958        // for normal case, do not schedule SCP if ServerStateNode is null
1959        LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName);
1960        return Procedure.NO_PROC_ID;
1961      }
1962    }
1963
1964    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1965    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1966    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1967    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1968    // sure that, the region list fetched by SCP will not be changed any more.
1969    if (serverNode != null) {
1970      serverNode.writeLock().lock();
1971    }
1972    try {
1973
1974      boolean carryingMeta = isCarryingMeta(serverName);
1975      if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1976        if (force) {
1977          LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1978            serverNode, carryingMeta, ServerState.ONLINE);
1979        } else {
1980          LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1981            serverNode, carryingMeta, ServerState.ONLINE);
1982          return Procedure.NO_PROC_ID;
1983        }
1984      }
1985      MasterProcedureEnv mpe = procExec.getEnvironment();
1986      // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1987      // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1988      // serverName just-in-case. An SCP that is scheduled when the server is
1989      // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1990      ServerState oldState = null;
1991      if (serverNode != null) {
1992        oldState = serverNode.getState();
1993        serverNode.setState(ServerState.CRASHED);
1994      }
1995      ServerCrashProcedure scp = force
1996        ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)
1997        : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta);
1998      long pid = procExec.submitProcedure(scp);
1999      LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName,
2000        carryingMeta,
2001        serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
2002      return pid;
2003    } finally {
2004      if (serverNode != null) {
2005        serverNode.writeLock().unlock();
2006      }
2007    }
2008  }
2009
2010  public void offlineRegion(final RegionInfo regionInfo) {
2011    // TODO used by MasterRpcServices
2012    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
2013    if (node != null) {
2014      node.offline();
2015    }
2016  }
2017
2018  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
2019    // TODO used by TestSplitTransactionOnCluster.java
2020  }
2021
2022  public Map<ServerName, List<RegionInfo>>
2023    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
2024    return regionStates.getSnapShotOfAssignment(regions);
2025  }
2026
2027  // ============================================================================================
2028  // TODO: UTILS/HELPERS?
2029  // ============================================================================================
2030  /**
2031   * Used by the client (via master) to identify if all regions have the schema updates
2032   * @return Pair indicating the status of the alter command (pending/total)
2033   */
2034  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
2035    if (isTableDisabled(tableName)) {
2036      return new Pair<Integer, Integer>(0, 0);
2037    }
2038
2039    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2040    int ritCount = 0;
2041    for (RegionState regionState : states) {
2042      if (!regionState.isOpened() && !regionState.isSplit()) {
2043        ritCount++;
2044      }
2045    }
2046    return new Pair<Integer, Integer>(ritCount, states.size());
2047  }
2048
2049  // ============================================================================================
2050  // TODO: Region State In Transition
2051  // ============================================================================================
2052  public boolean hasRegionsInTransition() {
2053    return regionStates.hasRegionsInTransition();
2054  }
2055
2056  public List<RegionStateNode> getRegionsInTransition() {
2057    return regionStates.getRegionsInTransition();
2058  }
2059
2060  public List<RegionInfo> getAssignedRegions() {
2061    return regionStates.getAssignedRegions();
2062  }
2063
2064  /**
2065   * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}.
2066   */
2067  public RegionInfo getRegionInfo(final byte[] regionName) {
2068    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
2069    return regionState != null ? regionState.getRegionInfo() : null;
2070  }
2071
2072  /**
2073   * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}.
2074   */
2075  public RegionInfo getRegionInfo(final String encodedRegionName) {
2076    final RegionStateNode regionState =
2077      regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName);
2078    return regionState != null ? regionState.getRegionInfo() : null;
2079  }
2080
2081  // ============================================================================================
2082  // Expected states on region state transition.
2083  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
2084  // See the comments in regionOpening method for more details.
2085  // ============================================================================================
2086  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
2087    State.OPEN // Retrying
2088  };
2089
2090  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
2091    State.CLOSING, // Retrying
2092    State.SPLITTING, // Offline the split parent
2093    State.MERGING // Offline the merge parents
2094  };
2095
2096  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
2097    State.CLOSED // Retrying
2098  };
2099
2100  // This is for manually scheduled region assign, can add other states later if we find out other
2101  // usages
2102  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
2103
2104  // We only allow unassign or move a region which is in OPEN state.
2105  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
2106
2107  // ============================================================================================
2108  // Region Status update
2109  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
2110  // and pre-assumptions are very tricky.
2111  // ============================================================================================
2112  private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
2113    RegionState.State newState, RegionState.State... expectedStates) {
2114    RegionState.State state = regionNode.getState();
2115    try {
2116      regionNode.transitionState(newState, expectedStates);
2117    } catch (UnexpectedStateException e) {
2118      return FutureUtils.failedFuture(e);
2119    }
2120    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2121    FutureUtils.addListener(future, (r, e) -> {
2122      if (e != null) {
2123        // revert
2124        regionNode.setState(state);
2125      }
2126    });
2127    return future;
2128  }
2129
2130  // should be called within the synchronized block of RegionStateNode
2131  CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
2132    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
2133    // update the region state, which means that the region could be in any state when we want to
2134    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
2135    return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
2136      ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation());
2137      // Here the server node could be null. For example, we want to assign the region to a given
2138      // region server and it crashes, and it is the region server which holds hbase:meta, then the
2139      // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But
2140      // after the SCP finishes, the server node will be removed, so when we arrive there, the
2141      // server
2142      // node will be null. This is not a big problem if we skip adding it, as later we will fail to
2143      // execute the remote procedure on the region server and then try to assign to another region
2144      // server
2145      if (serverNode != null) {
2146        serverNode.addRegion(regionNode);
2147      }
2148      // update the operation count metrics
2149      metrics.incrementOperationCounter();
2150    });
2151  }
2152
2153  // should be called under the RegionStateNode lock
2154  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
2155  // we will persist the FAILED_OPEN state into hbase:meta.
2156  CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
2157    RegionState.State state = regionNode.getState();
2158    ServerName regionLocation = regionNode.getRegionLocation();
2159    if (!giveUp) {
2160      if (regionLocation != null) {
2161        regionStates.removeRegionFromServer(regionLocation, regionNode);
2162      }
2163      return CompletableFuture.completedFuture(null);
2164    }
2165    regionNode.setState(State.FAILED_OPEN);
2166    regionNode.setRegionLocation(null);
2167    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2168    FutureUtils.addListener(future, (r, e) -> {
2169      if (e == null) {
2170        if (regionLocation != null) {
2171          regionStates.removeRegionFromServer(regionLocation, regionNode);
2172        }
2173      } else {
2174        // revert
2175        regionNode.setState(state);
2176        regionNode.setRegionLocation(regionLocation);
2177      }
2178    });
2179    return future;
2180  }
2181
2182  // should be called under the RegionStateNode lock
2183  CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
2184    return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
2185      .thenAccept(r -> {
2186        RegionInfo hri = regionNode.getRegionInfo();
2187        // Set meta has not initialized early. so people trying to create/edit tables will wait
2188        if (isMetaRegion(hri)) {
2189          setMetaAssigned(hri, false);
2190        }
2191        // update the operation count metrics
2192        metrics.incrementOperationCounter();
2193      });
2194  }
2195
2196  // for open and close, they will first be persist to the procedure store in
2197  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
2198  // as succeeded if the persistence to procedure store is succeeded, and then when the
2199  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
2200
2201  // should be called under the RegionStateNode lock
2202  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
2203    throws UnexpectedStateException {
2204    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
2205    RegionInfo regionInfo = regionNode.getRegionInfo();
2206    regionStates.addRegionToServer(regionNode);
2207    regionStates.removeFromFailedOpen(regionInfo);
2208  }
2209
2210  // should be called under the RegionStateNode lock
2211  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
2212    throws UnexpectedStateException {
2213    ServerName regionLocation = regionNode.getRegionLocation();
2214    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2215    regionNode.setRegionLocation(null);
2216    if (regionLocation != null) {
2217      regionNode.setLastHost(regionLocation);
2218      regionStates.removeRegionFromServer(regionLocation, regionNode);
2219    }
2220  }
2221
2222  // should be called under the RegionStateNode lock
2223  CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
2224    return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
2225      RegionInfo regionInfo = regionNode.getRegionInfo();
2226      if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2227        // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2228        // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2229        // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2230        // on table that contains state.
2231        setMetaAssigned(regionInfo, true);
2232      }
2233    });
2234  }
2235
2236  // should be called under the RegionStateNode lock
2237  // for SCP
2238  public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
2239    RegionState.State state = regionNode.getState();
2240    ServerName regionLocation = regionNode.getRegionLocation();
2241    regionNode.setState(State.ABNORMALLY_CLOSED);
2242    regionNode.setRegionLocation(null);
2243    CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
2244    FutureUtils.addListener(future, (r, e) -> {
2245      if (e == null) {
2246        if (regionLocation != null) {
2247          regionNode.setLastHost(regionLocation);
2248          regionStates.removeRegionFromServer(regionLocation, regionNode);
2249        }
2250      } else {
2251        // revert
2252        regionNode.setState(state);
2253        regionNode.setRegionLocation(regionLocation);
2254      }
2255    });
2256    return future;
2257  }
2258
2259  // ============================================================================================
2260  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2261  // ============================================================================================
2262
2263  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2264    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2265    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2266    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2267    // Update its state in regionStates to it shows as offline and split when read
2268    // later figuring what regions are in a table and what are not: see
2269    // regionStates#getRegionsOfTable
2270    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2271    node.setState(State.SPLIT);
2272    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2273    nodeA.setState(State.SPLITTING_NEW);
2274    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2275    nodeB.setState(State.SPLITTING_NEW);
2276
2277    TableDescriptor td = master.getTableDescriptors().get(parent.getTable());
2278    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2279    // without changing the one in the region node. This is a bit confusing but the region info
2280    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2281    // possible way to address this problem, or at least adding more comments about the trick to
2282    // deal with this problem, that when you want to filter out split parent, you need to check both
2283    // the RegionState on whether it is split, and also the region info. If one of them matches then
2284    // it is a split parent. And usually only one of them can match, as after restart, the region
2285    // state will be changed from SPLIT to CLOSED.
2286    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName, td);
2287    if (shouldAssignFavoredNodes(parent)) {
2288      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2289      getFavoredNodePromoter().generateFavoredNodesForDaughter(onlineServers, parent, daughterA,
2290        daughterB);
2291    }
2292  }
2293
2294  /**
2295   * When called here, the merge has happened. The merged regions have been unassigned and the above
2296   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2297   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2298   * below. Later they are deleted from the filesystem by the catalog janitor running against
2299   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2300   * (References are deleted after a compaction rewrites what the Reference points at but not until
2301   * the archiver chore runs, are the References removed).
2302   */
2303  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2304    RegionInfo[] mergeParents) throws IOException {
2305    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2306    node.setState(State.MERGED);
2307    for (RegionInfo ri : mergeParents) {
2308      regionStates.deleteRegion(ri);
2309    }
2310    TableDescriptor td = master.getTableDescriptors().get(child.getTable());
2311    regionStateStore.mergeRegions(child, mergeParents, serverName, td);
2312    if (shouldAssignFavoredNodes(child)) {
2313      getFavoredNodePromoter().generateFavoredNodesForMergedRegion(child, mergeParents);
2314    }
2315  }
2316
2317  /*
2318   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2319   * belongs to a non-system table.
2320   */
2321  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2322    return this.shouldAssignRegionsWithFavoredNodes
2323      && FavoredNodesManager.isFavoredNodeApplicable(region);
2324  }
2325
2326  // ============================================================================================
2327  // Assign Queue (Assign/Balance)
2328  // ============================================================================================
2329  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2330  private final ReentrantLock assignQueueLock = new ReentrantLock();
2331  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2332
2333  /**
2334   * Add the assign operation to the assignment queue. The pending assignment operation will be
2335   * processed, and each region will be assigned by a server using the balancer.
2336   */
2337  protected void queueAssign(final RegionStateNode regionNode) {
2338    regionNode.getProcedureEvent().suspend();
2339
2340    // TODO: quick-start for meta and the other sys-tables?
2341    assignQueueLock.lock();
2342    try {
2343      pendingAssignQueue.add(regionNode);
2344      if (
2345        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2346          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2347      ) {
2348        assignQueueFullCond.signal();
2349      }
2350    } finally {
2351      assignQueueLock.unlock();
2352    }
2353  }
2354
2355  private void startAssignmentThread() {
2356    assignThread = new Thread(master.getServerName().toShortString()) {
2357      @Override
2358      public void run() {
2359        while (isRunning()) {
2360          processAssignQueue();
2361        }
2362        pendingAssignQueue.clear();
2363      }
2364    };
2365    assignThread.setDaemon(true);
2366    assignThread.start();
2367  }
2368
2369  private void stopAssignmentThread() {
2370    assignQueueSignal();
2371    try {
2372      while (assignThread.isAlive()) {
2373        assignQueueSignal();
2374        assignThread.join(250);
2375      }
2376    } catch (InterruptedException e) {
2377      LOG.warn("join interrupted", e);
2378      Thread.currentThread().interrupt();
2379    }
2380  }
2381
2382  private void assignQueueSignal() {
2383    assignQueueLock.lock();
2384    try {
2385      assignQueueFullCond.signal();
2386    } finally {
2387      assignQueueLock.unlock();
2388    }
2389  }
2390
2391  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2392  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2393    HashMap<RegionInfo, RegionStateNode> regions = null;
2394
2395    assignQueueLock.lock();
2396    try {
2397      if (pendingAssignQueue.isEmpty() && isRunning()) {
2398        assignQueueFullCond.await();
2399      }
2400
2401      if (!isRunning()) {
2402        return null;
2403      }
2404      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2405      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2406      for (RegionStateNode regionNode : pendingAssignQueue) {
2407        regions.put(regionNode.getRegionInfo(), regionNode);
2408      }
2409      pendingAssignQueue.clear();
2410    } catch (InterruptedException e) {
2411      LOG.warn("got interrupted ", e);
2412      Thread.currentThread().interrupt();
2413    } finally {
2414      assignQueueLock.unlock();
2415    }
2416    return regions;
2417  }
2418
2419  private void processAssignQueue() {
2420    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2421    if (regions == null || regions.size() == 0 || !isRunning()) {
2422      return;
2423    }
2424
2425    if (LOG.isTraceEnabled()) {
2426      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2427    }
2428
2429    // TODO: Optimize balancer. pass a RegionPlan?
2430    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2431    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2432    // Regions for system tables requiring reassignment
2433    final List<RegionInfo> systemHRIs = new ArrayList<>();
2434    for (RegionStateNode regionStateNode : regions.values()) {
2435      boolean sysTable = regionStateNode.isSystemTable();
2436      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2437      if (regionStateNode.getRegionLocation() != null) {
2438        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2439      } else {
2440        hris.add(regionStateNode.getRegionInfo());
2441      }
2442    }
2443
2444    // TODO: connect with the listener to invalidate the cache
2445
2446    // TODO use events
2447    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2448    for (int i = 0; servers.size() < 1; ++i) {
2449      // Report every fourth time around this loop; try not to flood log.
2450      if (i % 4 == 0) {
2451        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2452      }
2453
2454      if (!isRunning()) {
2455        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2456        return;
2457      }
2458      Threads.sleep(250);
2459      servers = master.getServerManager().createDestinationServersList();
2460    }
2461
2462    if (!systemHRIs.isEmpty()) {
2463      // System table regions requiring reassignment are present, get region servers
2464      // not available for system table regions
2465      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2466      List<ServerName> serversForSysTables =
2467        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2468      if (serversForSysTables.isEmpty()) {
2469        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2470          + "instead considering all candidate servers!");
2471      }
2472      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2473        + ", allServersCount=" + servers.size());
2474      processAssignmentPlans(regions, null, systemHRIs,
2475        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2476          ? servers
2477          : serversForSysTables);
2478    }
2479
2480    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2481  }
2482
2483  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2484    List<RegionInfo> hirs) {
2485    for (RegionInfo ri : hirs) {
2486      if (
2487        regions.get(ri).getRegionLocation() != null
2488          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2489      ) {
2490        return true;
2491      }
2492    }
2493    return false;
2494  }
2495
2496  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2497    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2498    final List<ServerName> servers) {
2499    boolean isTraceEnabled = LOG.isTraceEnabled();
2500    if (isTraceEnabled) {
2501      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2502    }
2503
2504    final LoadBalancer balancer = getBalancer();
2505    // ask the balancer where to place regions
2506    if (retainMap != null && !retainMap.isEmpty()) {
2507      if (isTraceEnabled) {
2508        LOG.trace("retain assign regions=" + retainMap);
2509      }
2510      try {
2511        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2512      } catch (IOException e) {
2513        LOG.warn("unable to retain assignment", e);
2514        addToPendingAssignment(regions, retainMap.keySet());
2515      }
2516    }
2517
2518    // TODO: Do we need to split retain and round-robin?
2519    // the retain seems to fallback to round-robin/random if the region is not in the map.
2520    if (!hris.isEmpty()) {
2521      Collections.sort(hris, RegionInfo.COMPARATOR);
2522      if (isTraceEnabled) {
2523        LOG.trace("round robin regions=" + hris);
2524      }
2525      try {
2526        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2527      } catch (IOException e) {
2528        LOG.warn("unable to round-robin assignment", e);
2529        addToPendingAssignment(regions, hris);
2530      }
2531    }
2532  }
2533
2534  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2535    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2536    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2537    final long st = EnvironmentEdgeManager.currentTime();
2538
2539    if (plan.isEmpty()) {
2540      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2541    }
2542
2543    int evcount = 0;
2544    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2545      final ServerName server = entry.getKey();
2546      for (RegionInfo hri : entry.getValue()) {
2547        final RegionStateNode regionNode = regions.get(hri);
2548        regionNode.setRegionLocation(server);
2549        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2550          assignQueueLock.lock();
2551          try {
2552            pendingAssignQueue.add(regionNode);
2553          } finally {
2554            assignQueueLock.unlock();
2555          }
2556        } else {
2557          events[evcount++] = regionNode.getProcedureEvent();
2558        }
2559      }
2560    }
2561    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2562
2563    final long et = EnvironmentEdgeManager.currentTime();
2564    if (LOG.isTraceEnabled()) {
2565      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2566    }
2567  }
2568
2569  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2570    final Collection<RegionInfo> pendingRegions) {
2571    assignQueueLock.lock();
2572    try {
2573      for (RegionInfo hri : pendingRegions) {
2574        pendingAssignQueue.add(regions.get(hri));
2575      }
2576    } finally {
2577      assignQueueLock.unlock();
2578    }
2579  }
2580
2581  /**
2582   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2583   * where system table regions should not be assigned to. For system table, we must assign regions
2584   * to a server with highest version. However, we can disable this exclusion using config:
2585   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2586   * available with definition of minVersionToMoveSysTables.
2587   * @return List of Excluded servers for System table regions.
2588   */
2589  public List<ServerName> getExcludedServersForSystemTable() {
2590    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2591    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2592    // RegionServerInfo that includes Version.
2593    List<Pair<ServerName, String>> serverList =
2594      master.getServerManager().getOnlineServersList().stream()
2595        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2596    if (serverList.isEmpty()) {
2597      return new ArrayList<>();
2598    }
2599    String highestVersion = Collections
2600      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2601      .getSecond();
2602    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2603      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2604      if (comparedValue > 0) {
2605        return new ArrayList<>();
2606      }
2607    }
2608    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2609      .map(Pair::getFirst).collect(Collectors.toList());
2610  }
2611
2612  MasterServices getMaster() {
2613    return master;
2614  }
2615
2616  /** Returns a snapshot of rsReports */
2617  public Map<ServerName, Set<byte[]>> getRSReports() {
2618    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2619    synchronized (rsReports) {
2620      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2621    }
2622    return rsReportsSnapshot;
2623  }
2624
2625  /**
2626   * Provide regions state count for given table. e.g howmany regions of give table are
2627   * opened/closed/rit etc
2628   * @param tableName TableName
2629   * @return region states count
2630   */
2631  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2632    int openRegionsCount = 0;
2633    int closedRegionCount = 0;
2634    int ritCount = 0;
2635    int splitRegionCount = 0;
2636    int totalRegionCount = 0;
2637    if (!isTableDisabled(tableName)) {
2638      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2639      for (RegionState regionState : states) {
2640        if (regionState.isOpened()) {
2641          openRegionsCount++;
2642        } else if (regionState.isClosed()) {
2643          closedRegionCount++;
2644        } else if (regionState.isSplit()) {
2645          splitRegionCount++;
2646        }
2647      }
2648      totalRegionCount = states.size();
2649      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2650    }
2651    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2652      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2653      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2654  }
2655
2656}