001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.locks.Condition;
034import java.util.concurrent.locks.ReentrantLock;
035import java.util.stream.Collectors;
036import java.util.stream.Stream;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.HBaseIOException;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.MetaTableAccessor;
042import org.apache.hadoop.hbase.PleaseHoldException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.UnknownRegionException;
046import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
047import org.apache.hadoop.hbase.client.MasterSwitchType;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.RegionReplicaUtil;
051import org.apache.hadoop.hbase.client.RegionStatesCount;
052import org.apache.hadoop.hbase.client.Result;
053import org.apache.hadoop.hbase.client.ResultScanner;
054import org.apache.hadoop.hbase.client.Scan;
055import org.apache.hadoop.hbase.client.TableState;
056import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
057import org.apache.hadoop.hbase.favored.FavoredNodesManager;
058import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
059import org.apache.hadoop.hbase.master.LoadBalancer;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
062import org.apache.hadoop.hbase.master.RegionPlan;
063import org.apache.hadoop.hbase.master.RegionState;
064import org.apache.hadoop.hbase.master.RegionState.State;
065import org.apache.hadoop.hbase.master.ServerManager;
066import org.apache.hadoop.hbase.master.TableStateManager;
067import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer;
068import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
070import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
071import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
072import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
073import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
074import org.apache.hadoop.hbase.master.region.MasterRegion;
075import org.apache.hadoop.hbase.procedure2.Procedure;
076import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
077import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
078import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
079import org.apache.hadoop.hbase.procedure2.util.StringUtils;
080import org.apache.hadoop.hbase.regionserver.SequenceId;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
083import org.apache.hadoop.hbase.util.Pair;
084import org.apache.hadoop.hbase.util.Threads;
085import org.apache.hadoop.hbase.util.VersionInfo;
086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
087import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
088import org.apache.yetus.audience.InterfaceAudience;
089import org.apache.zookeeper.KeeperException;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
098
099/**
100 * The AssignmentManager is the coordinator for region assign/unassign operations.
101 * <ul>
102 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li>
103 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li>
104 * </ul>
105 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split,
106 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns
107 * are triggered by DisableTable, Split, Merge
108 */
109@InterfaceAudience.Private
110public class AssignmentManager {
111  private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class);
112
113  // TODO: AMv2
114  // - handle region migration from hbase1 to hbase2.
115  // - handle sys table assignment first (e.g. acl, namespace)
116  // - handle table priorities
117  // - If ServerBusyException trying to update hbase:meta, we abort the Master
118  // See updateRegionLocation in RegionStateStore.
119  //
120  // See also
121  // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5
122  // for other TODOs.
123
124  public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY =
125    "hbase.assignment.bootstrap.thread.pool.size";
126
127  public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY =
128    "hbase.assignment.dispatch.wait.msec";
129  private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150;
130
131  public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY =
132    "hbase.assignment.dispatch.wait.queue.max.size";
133  private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100;
134
135  public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY =
136    "hbase.assignment.rit.chore.interval.msec";
137  private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000;
138
139  public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY =
140    "hbase.assignment.dead.region.metric.chore.interval.msec";
141  private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000;
142
143  public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts";
144  private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE;
145
146  public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS =
147    "hbase.assignment.retry.immediately.maximum.attempts";
148  private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3;
149
150  /** Region in Transition metrics threshold time */
151  public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
152    "hbase.metrics.rit.stuck.warning.threshold";
153  private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
154  public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";
155
156  public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";
157
158  public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;
159
160  /** The wait time in millis before checking again if the region's previous RS is back online */
161  public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
162    "hbase.master.scp.retain.assignment.force.wait-interval";
163
164  public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;
165
166  /**
167   * The number of times to check if the region's previous RS is back online, before giving up and
168   * proceeding with assignment on a new RS
169   */
170  public static final String FORCE_REGION_RETAINMENT_RETRIES =
171    "hbase.master.scp.retain.assignment.force.retries";
172
173  public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;
174
175  private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
176  private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");
177
178  private final MetricsAssignmentManager metrics;
179  private final RegionInTransitionChore ritChore;
180  private final DeadServerMetricRegionChore deadMetricChore;
181  private final MasterServices master;
182
183  private final AtomicBoolean running = new AtomicBoolean(false);
184  private final RegionStates regionStates = new RegionStates();
185  private final RegionStateStore regionStateStore;
186
187  /**
188   * When the operator uses this configuration option, any version between the current cluster
189   * version and the value of "hbase.min.version.move.system.tables" does not trigger any
190   * auto-region movement. Auto-region movement here refers to auto-migration of system table
191   * regions to newer server versions. It is assumed that the configured range of versions does not
192   * require special handling of moving system table regions to higher versioned RegionServer. This
193   * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume
194   * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as
195   * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then
196   * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to
197   * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one
198   * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system
199   * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by
200   * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is
201   * introduced as part of HBASE-22923.
202   */
203  private final String minVersionToMoveSysTables;
204
205  private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG =
206    "hbase.min.version.move.system.tables";
207  private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = "";
208
209  private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>();
210
211  private final boolean shouldAssignRegionsWithFavoredNodes;
212  private final int assignDispatchWaitQueueMaxSize;
213  private final int assignDispatchWaitMillis;
214  private final int assignMaxAttempts;
215  private final int assignRetryImmediatelyMaxAttempts;
216
217  private final MasterRegion masterRegion;
218
219  private final Object checkIfShouldMoveSystemRegionLock = new Object();
220
221  private Thread assignThread;
222
223  private final boolean forceRegionRetainment;
224
225  private final long forceRegionRetainmentWaitInterval;
226
227  private final int forceRegionRetainmentRetries;
228
229  public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
230    this(master, masterRegion, new RegionStateStore(master, masterRegion));
231  }
232
233  AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
234    this.master = master;
235    this.regionStateStore = stateStore;
236    this.metrics = new MetricsAssignmentManager();
237    this.masterRegion = masterRegion;
238
239    final Configuration conf = master.getConfiguration();
240
241    // Only read favored nodes if using the favored nodes load balancer.
242    this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class
243      .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
244
245    this.assignDispatchWaitMillis =
246      conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
247    this.assignDispatchWaitQueueMaxSize =
248      conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
249
250    this.assignMaxAttempts =
251      Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS));
252    this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
253      DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
254
255    int ritChoreInterval =
256      conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC);
257    this.ritChore = new RegionInTransitionChore(ritChoreInterval);
258
259    int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
260      DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
261    if (deadRegionChoreInterval > 0) {
262      this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
263    } else {
264      this.deadMetricChore = null;
265    }
266    minVersionToMoveSysTables =
267      conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);
268
269    forceRegionRetainment =
270      conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
271    forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
272      DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
273    forceRegionRetainmentRetries =
274      conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
275  }
276
277  private void mirrorMetaLocations() throws IOException, KeeperException {
278    // For compatibility, mirror the meta region state to zookeeper
279    // And we still need to use zookeeper to publish the meta region locations to region
280    // server, so they can serve as ClientMetaService
281    ZKWatcher zk = master.getZooKeeper();
282    if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) {
283      // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed
284      return;
285    }
286    Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes();
287    for (RegionStateNode metaState : metaStates) {
288      MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(),
289        metaState.getRegionInfo().getReplicaId(), metaState.getState());
290    }
291    int replicaCount = metaStates.size();
292    // remove extra mirror locations
293    for (String znode : zk.getMetaReplicaNodes()) {
294      int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode);
295      if (replicaId >= replicaCount) {
296        MetaTableLocator.deleteMetaLocation(zk, replicaId);
297      }
298    }
299  }
300
301  public void start() throws IOException, KeeperException {
302    if (!running.compareAndSet(false, true)) {
303      return;
304    }
305
306    LOG.trace("Starting assignment manager");
307
308    // Start the Assignment Thread
309    startAssignmentThread();
310    // load meta region states.
311    // here we are still in the early steps of active master startup. There is only one thread(us)
312    // can access AssignmentManager and create region node, so here we do not need to lock the
313    // region node.
314    try (ResultScanner scanner =
315      masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) {
316      for (;;) {
317        Result result = scanner.next();
318        if (result == null) {
319          break;
320        }
321        RegionStateStore
322          .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> {
323            RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
324            regionNode.setState(state);
325            regionNode.setLastHost(lastHost);
326            regionNode.setRegionLocation(regionLocation);
327            regionNode.setOpenSeqNum(openSeqNum);
328            if (regionNode.getProcedure() != null) {
329              regionNode.getProcedure().stateLoaded(this, regionNode);
330            }
331            if (regionLocation != null) {
332              // TODO: this could lead to some orphan server state nodes, as it is possible that the
333              // region server is already dead and its SCP has already finished but we have
334              // persisted an opening state on this region server. Finally the TRSP will assign the
335              // region to another region server, so it will not cause critical problems, just waste
336              // some memory as no one will try to cleanup these orphan server state nodes.
337              regionStates.createServer(regionLocation);
338              regionStates.addRegionToServer(regionNode);
339            }
340            if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) {
341              setMetaAssigned(regionInfo, state == State.OPEN);
342            }
343            LOG.debug("Loaded hbase:meta {}", regionNode);
344          }, result);
345      }
346    }
347    mirrorMetaLocations();
348  }
349
350  /**
351   * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode.
352   * <p>
353   * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta
354   * method below. And it is also very important as now before submitting a TRSP, we need to attach
355   * it to the RegionStateNode, which acts like a guard, so we need to restore this information at
356   * the very beginning, before we start processing any procedures.
357   */
358  public void setupRIT(List<TransitRegionStateProcedure> procs) {
359    procs.forEach(proc -> {
360      RegionInfo regionInfo = proc.getRegion();
361      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
362      TransitRegionStateProcedure existingProc = regionNode.getProcedure();
363      if (existingProc != null) {
364        // This is possible, as we will detach the procedure from the RSN before we
365        // actually finish the procedure. This is because that, we will detach the TRSP from the RSN
366        // during execution, at that time, the procedure has not been marked as done in the pv2
367        // framework yet, so it is possible that we schedule a new TRSP immediately and when
368        // arriving here, we will find out that there are multiple TRSPs for the region. But we can
369        // make sure that, only the last one can take the charge, the previous ones should have all
370        // been finished already. So here we will compare the proc id, the greater one will win.
371        if (existingProc.getProcId() < proc.getProcId()) {
372          // the new one wins, unset and set it to the new one below
373          regionNode.unsetProcedure(existingProc);
374        } else {
375          // the old one wins, skip
376          return;
377        }
378      }
379      LOG.info("Attach {} to {} to restore RIT", proc, regionNode);
380      regionNode.setProcedure(proc);
381    });
382  }
383
384  public void stop() {
385    if (!running.compareAndSet(true, false)) {
386      return;
387    }
388
389    LOG.info("Stopping assignment manager");
390
391    // The AM is started before the procedure executor,
392    // but the actual work will be loaded/submitted only once we have the executor
393    final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null;
394
395    // Remove the RIT chore
396    if (hasProcExecutor) {
397      master.getMasterProcedureExecutor().removeChore(this.ritChore);
398      if (this.deadMetricChore != null) {
399        master.getMasterProcedureExecutor().removeChore(this.deadMetricChore);
400      }
401    }
402
403    // Stop the Assignment Thread
404    stopAssignmentThread();
405
406    // Stop the RegionStateStore
407    regionStates.clear();
408
409    // Update meta events (for testing)
410    if (hasProcExecutor) {
411      metaLoadEvent.suspend();
412      for (RegionInfo hri : getMetaRegionSet()) {
413        setMetaAssigned(hri, false);
414      }
415    }
416  }
417
418  public boolean isRunning() {
419    return running.get();
420  }
421
422  public Configuration getConfiguration() {
423    return master.getConfiguration();
424  }
425
426  public MetricsAssignmentManager getAssignmentManagerMetrics() {
427    return metrics;
428  }
429
430  private LoadBalancer getBalancer() {
431    return master.getLoadBalancer();
432  }
433
434  private MasterProcedureEnv getProcedureEnvironment() {
435    return master.getMasterProcedureExecutor().getEnvironment();
436  }
437
438  private MasterProcedureScheduler getProcedureScheduler() {
439    return getProcedureEnvironment().getProcedureScheduler();
440  }
441
442  int getAssignMaxAttempts() {
443    return assignMaxAttempts;
444  }
445
446  public boolean isForceRegionRetainment() {
447    return forceRegionRetainment;
448  }
449
450  public long getForceRegionRetainmentWaitInterval() {
451    return forceRegionRetainmentWaitInterval;
452  }
453
454  public int getForceRegionRetainmentRetries() {
455    return forceRegionRetainmentRetries;
456  }
457
458  int getAssignRetryImmediatelyMaxAttempts() {
459    return assignRetryImmediatelyMaxAttempts;
460  }
461
462  public RegionStates getRegionStates() {
463    return regionStates;
464  }
465
466  /**
467   * Returns the regions hosted by the specified server.
468   * <p/>
469   * Notice that, for SCP, after we submit the SCP, no one can change the region list for the
470   * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a
471   * snapshot of the current region list for this server, which means, right after you get the
472   * region list, new regions may be moved to this server or some regions may be moved out from this
473   * server, so you should not use it critically if you need strong consistency.
474   */
475  public List<RegionInfo> getRegionsOnServer(ServerName serverName) {
476    ServerStateNode serverInfo = regionStates.getServerNode(serverName);
477    if (serverInfo == null) {
478      return Collections.emptyList();
479    }
480    return serverInfo.getRegionInfoList();
481  }
482
483  private RegionInfo getRegionInfo(RegionStateNode rsn) {
484    if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) {
485      // see the comments in markRegionAsSplit on why we need to do this converting.
486      return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true)
487        .build();
488    } else {
489      return rsn.getRegionInfo();
490    }
491  }
492
493  private Stream<RegionStateNode> getRegionStateNodes(TableName tableName,
494    boolean excludeOfflinedSplitParents) {
495    Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream();
496    if (excludeOfflinedSplitParents) {
497      return stream.filter(rsn -> !rsn.isSplit());
498    } else {
499      return stream;
500    }
501  }
502
503  public List<RegionInfo> getTableRegions(TableName tableName,
504    boolean excludeOfflinedSplitParents) {
505    return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo)
506      .collect(Collectors.toList());
507  }
508
509  public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName,
510    boolean excludeOfflinedSplitParents) {
511    return getRegionStateNodes(tableName, excludeOfflinedSplitParents)
512      .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation()))
513      .collect(Collectors.toList());
514  }
515
516  public RegionStateStore getRegionStateStore() {
517    return regionStateStore;
518  }
519
520  public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) {
521    return this.shouldAssignRegionsWithFavoredNodes
522      ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo)
523      : ServerName.EMPTY_SERVER_LIST;
524  }
525
526  // ============================================================================================
527  // Table State Manager helpers
528  // ============================================================================================
529  private TableStateManager getTableStateManager() {
530    return master.getTableStateManager();
531  }
532
533  private boolean isTableEnabled(final TableName tableName) {
534    return getTableStateManager().isTableState(tableName, TableState.State.ENABLED);
535  }
536
537  private boolean isTableDisabled(final TableName tableName) {
538    return getTableStateManager().isTableState(tableName, TableState.State.DISABLED,
539      TableState.State.DISABLING);
540  }
541
542  // ============================================================================================
543  // META Helpers
544  // ============================================================================================
545  private boolean isMetaRegion(final RegionInfo regionInfo) {
546    return regionInfo.isMetaRegion();
547  }
548
549  public boolean isMetaRegion(final byte[] regionName) {
550    return getMetaRegionFromName(regionName) != null;
551  }
552
553  public RegionInfo getMetaRegionFromName(final byte[] regionName) {
554    for (RegionInfo hri : getMetaRegionSet()) {
555      if (Bytes.equals(hri.getRegionName(), regionName)) {
556        return hri;
557      }
558    }
559    return null;
560  }
561
562  public boolean isCarryingMeta(final ServerName serverName) {
563    // TODO: handle multiple meta
564    return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO);
565  }
566
567  private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) {
568    // TODO: check for state?
569    final RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
570    return (node != null && serverName.equals(node.getRegionLocation()));
571  }
572
573  private RegionInfo getMetaForRegion(final RegionInfo regionInfo) {
574    // if (regionInfo.isMetaRegion()) return regionInfo;
575    // TODO: handle multiple meta. if the region provided is not meta lookup
576    // which meta the region belongs to.
577    return RegionInfoBuilder.FIRST_META_REGIONINFO;
578  }
579
580  // TODO: handle multiple meta.
581  private static final Set<RegionInfo> META_REGION_SET =
582    Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO);
583
584  public Set<RegionInfo> getMetaRegionSet() {
585    return META_REGION_SET;
586  }
587
588  // ============================================================================================
589  // META Event(s) helpers
590  // ============================================================================================
591  /**
592   * Notice that, this only means the meta region is available on a RS, but the AM may still be
593   * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first
594   * before checking this method, unless you can make sure that your piece of code can only be
595   * executed after AM builds the region states.
596   * @see #isMetaLoaded()
597   */
598  public boolean isMetaAssigned() {
599    return metaAssignEvent.isReady();
600  }
601
602  public boolean isMetaRegionInTransition() {
603    return !isMetaAssigned();
604  }
605
606  /**
607   * Notice that this event does not mean the AM has already finished region state rebuilding. See
608   * the comment of {@link #isMetaAssigned()} for more details.
609   * @see #isMetaAssigned()
610   */
611  public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) {
612    return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc);
613  }
614
615  private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) {
616    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
617    ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo);
618    if (assigned) {
619      metaAssignEvent.wake(getProcedureScheduler());
620    } else {
621      metaAssignEvent.suspend();
622    }
623  }
624
625  private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) {
626    assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo;
627    // TODO: handle multiple meta.
628    return metaAssignEvent;
629  }
630
631  /**
632   * Wait until AM finishes the meta loading, i.e, the region states rebuilding.
633   * @see #isMetaLoaded()
634   * @see #waitMetaAssigned(Procedure, RegionInfo)
635   */
636  public boolean waitMetaLoaded(Procedure<?> proc) {
637    return metaLoadEvent.suspendIfNotReady(proc);
638  }
639
640  /**
641   * This method will be called in master initialization method after calling
642   * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign
643   * procedures for offline regions, which may be conflict with creating table.
644   * <p/>
645   * This is a bit dirty, should be reconsidered after we decide whether to keep the
646   * {@link #processOfflineRegions()} method.
647   */
648  public void wakeMetaLoadedEvent() {
649    metaLoadEvent.wake(getProcedureScheduler());
650    assert isMetaLoaded() : "expected meta to be loaded";
651  }
652
653  /**
654   * Return whether AM finishes the meta loading, i.e, the region states rebuilding.
655   * @see #isMetaAssigned()
656   * @see #waitMetaLoaded(Procedure)
657   */
658  public boolean isMetaLoaded() {
659    return metaLoadEvent.isReady();
660  }
661
662  /**
663   * Start a new thread to check if there are region servers whose versions are higher than others.
664   * If so, move all system table regions to RS with the highest version to keep compatibility. The
665   * reason is, RS in new version may not be able to access RS in old version when there are some
666   * incompatible changes.
667   * <p>
668   * This method is called when a new RegionServer is added to cluster only.
669   * </p>
670   */
671  public void checkIfShouldMoveSystemRegionAsync() {
672    // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that
673    // it should 'move' the system tables from the old server to the new server but
674    // ServerCrashProcedure is on it; and it will take care of the assign without dataloss.
675    if (this.master.getServerManager().countOfRegionServers() <= 1) {
676      return;
677    }
678    // This thread used to run whenever there was a change in the cluster. The ZooKeeper
679    // childrenChanged notification came in before the nodeDeleted message and so this method
680    // cold run before a ServerCrashProcedure could run. That meant that this thread could see
681    // a Crashed Server before ServerCrashProcedure and it could find system regions on the
682    // crashed server and go move them before ServerCrashProcedure had a chance; could be
683    // dataloss too if WALs were not recovered.
684    new Thread(() -> {
685      try {
686        synchronized (checkIfShouldMoveSystemRegionLock) {
687          List<RegionPlan> plans = new ArrayList<>();
688          // TODO: I don't think this code does a good job if all servers in cluster have same
689          // version. It looks like it will schedule unnecessary moves.
690          for (ServerName server : getExcludedServersForSystemTable()) {
691            if (master.getServerManager().isServerDead(server)) {
692              // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable()
693              // considers only online servers, the server could be queued for dead server
694              // processing. As region assignments for crashed server is handled by
695              // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through
696              // regular flow of LoadBalancer as a favored node and not to have this special
697              // handling.
698              continue;
699            }
700            List<RegionInfo> regionsShouldMove = getSystemTables(server);
701            if (!regionsShouldMove.isEmpty()) {
702              for (RegionInfo regionInfo : regionsShouldMove) {
703                // null value for dest forces destination server to be selected by balancer
704                RegionPlan plan = new RegionPlan(regionInfo, server, null);
705                if (regionInfo.isMetaRegion()) {
706                  // Must move meta region first.
707                  LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(),
708                    server);
709                  moveAsync(plan);
710                } else {
711                  plans.add(plan);
712                }
713              }
714            }
715            for (RegionPlan plan : plans) {
716              LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(),
717                server);
718              moveAsync(plan);
719            }
720          }
721        }
722      } catch (Throwable t) {
723        LOG.error(t.toString(), t);
724      }
725    }).start();
726  }
727
728  private List<RegionInfo> getSystemTables(ServerName serverName) {
729    ServerStateNode serverNode = regionStates.getServerNode(serverName);
730    if (serverNode == null) {
731      return Collections.emptyList();
732    }
733    return serverNode.getSystemRegionInfoList();
734  }
735
736  private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates)
737    throws HBaseIOException {
738    if (regionNode.getProcedure() != null) {
739      throw new HBaseIOException(
740        regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId());
741    }
742    if (!regionNode.isInState(expectedStates)) {
743      throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode);
744    }
745    if (isTableDisabled(regionNode.getTable())) {
746      throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode);
747    }
748  }
749
750  /**
751   * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if
752   * not appropriate UNLESS override is set. Used by hbck2 but also by straightline
753   * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}.
754   * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking
755   *      used when only when no checking needed.
756   * @param override If false, check RegionState is appropriate for assign; if not throw exception.
757   */
758  private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn,
759    boolean override) throws IOException {
760    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
761    regionNode.lock();
762    try {
763      if (override) {
764        if (regionNode.getProcedure() != null) {
765          regionNode.unsetProcedure(regionNode.getProcedure());
766        }
767      } else {
768        preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN);
769      }
770      assert regionNode.getProcedure() == null;
771      return regionNode.setProcedure(
772        TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn));
773    } finally {
774      regionNode.unlock();
775    }
776  }
777
778  /**
779   * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes
780   * appriopriate state ripe for assign.
781   * @see #createAssignProcedure(RegionInfo, ServerName, boolean)
782   */
783  private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode,
784    ServerName targetServer) {
785    regionNode.lock();
786    try {
787      return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
788        regionNode.getRegionInfo(), targetServer));
789    } finally {
790      regionNode.unlock();
791    }
792  }
793
794  public long assign(RegionInfo regionInfo, ServerName sn) throws IOException {
795    TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false);
796    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
797    return proc.getProcId();
798  }
799
800  public long assign(RegionInfo regionInfo) throws IOException {
801    return assign(regionInfo, null);
802  }
803
804  /**
805   * Submits a procedure that assigns a region to a target server without waiting for it to finish
806   * @param regionInfo the region we would like to assign
807   * @param sn         target server name
808   */
809  public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException {
810    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(),
811      createAssignProcedure(regionInfo, sn, false));
812  }
813
814  /**
815   * Submits a procedure that assigns a region without waiting for it to finish
816   * @param regionInfo the region we would like to assign
817   */
818  public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException {
819    return assignAsync(regionInfo, null);
820  }
821
822  public long unassign(RegionInfo regionInfo) throws IOException {
823    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
824    if (regionNode == null) {
825      throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName());
826    }
827    TransitRegionStateProcedure proc;
828    regionNode.lock();
829    try {
830      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
831      proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo);
832      regionNode.setProcedure(proc);
833    } finally {
834      regionNode.unlock();
835    }
836    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
837    return proc.getProcId();
838  }
839
840  public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo,
841    ServerName targetServer) throws HBaseIOException {
842    RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo);
843    if (regionNode == null) {
844      throw new UnknownRegionException(
845        "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)");
846    }
847    TransitRegionStateProcedure proc;
848    regionNode.lock();
849    try {
850      preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
851      regionNode.checkOnline();
852      proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer);
853      regionNode.setProcedure(proc);
854    } finally {
855      regionNode.unlock();
856    }
857    return proc;
858  }
859
860  public void move(RegionInfo regionInfo) throws IOException {
861    TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null);
862    ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
863  }
864
865  public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException {
866    TransitRegionStateProcedure proc =
867      createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination());
868    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
869  }
870
871  public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException {
872    ServerName current =
873      this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo());
874    if (current == null || !current.equals(regionPlan.getSource())) {
875      LOG.debug("Skip region plan {}, source server not match, current region location is {}",
876        regionPlan, current == null ? "(null)" : current);
877      return null;
878    }
879    return moveAsync(regionPlan);
880  }
881
882  // ============================================================================================
883  // RegionTransition procedures helpers
884  // ============================================================================================
885
886  /**
887   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
888   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
889   *         to populate the assigns with targets chosen using round-robin (default balancer
890   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
891   *         AssignProcedure will ask the balancer for a new target, and so on.
892   */
893  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris,
894    List<ServerName> serversToExclude) {
895    if (hris.isEmpty()) {
896      return new TransitRegionStateProcedure[0];
897    }
898
899    if (
900      serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1
901    ) {
902      LOG.debug("Only one region server found and hence going ahead with the assignment");
903      serversToExclude = null;
904    }
905    try {
906      // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do
907      // a better job if it has all the assignments in the one lump.
908      Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris,
909        this.master.getServerManager().createDestinationServersList(serversToExclude));
910      // Return mid-method!
911      return createAssignProcedures(assignments);
912    } catch (HBaseIOException hioe) {
913      LOG.warn("Failed roundRobinAssignment", hioe);
914    }
915    // If an error above, fall-through to this simpler assign. Last resort.
916    return createAssignProcedures(hris);
917  }
918
919  /**
920   * Create round-robin assigns. Use on table creation to distribute out regions across cluster.
921   * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer
922   *         to populate the assigns with targets chosen using round-robin (default balancer
923   *         scheme). If at assign-time, the target chosen is no longer up, thats fine, the
924   *         AssignProcedure will ask the balancer for a new target, and so on.
925   */
926  public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) {
927    return createRoundRobinAssignProcedures(hris, null);
928  }
929
930  static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) {
931    if (left.getRegion().isMetaRegion()) {
932      if (right.getRegion().isMetaRegion()) {
933        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
934      }
935      return -1;
936    } else if (right.getRegion().isMetaRegion()) {
937      return +1;
938    }
939    if (left.getRegion().getTable().isSystemTable()) {
940      if (right.getRegion().getTable().isSystemTable()) {
941        return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
942      }
943      return -1;
944    } else if (right.getRegion().getTable().isSystemTable()) {
945      return +1;
946    }
947    return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion());
948  }
949
950  /**
951   * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This
952   * method is called from HBCK2.
953   * @return an assign or null
954   */
955  public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) {
956    TransitRegionStateProcedure trsp = null;
957    try {
958      trsp = createAssignProcedure(ri, null, override);
959    } catch (IOException ioe) {
960      LOG.info(
961        "Failed {} assign, override={}"
962          + (override ? "" : "; set override to by-pass state checks."),
963        ri.getEncodedName(), override, ioe);
964    }
965    return trsp;
966  }
967
968  /**
969   * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2.
970   * @return an unassign or null
971   */
972  public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) {
973    RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri);
974    TransitRegionStateProcedure trsp = null;
975    regionNode.lock();
976    try {
977      if (override) {
978        if (regionNode.getProcedure() != null) {
979          regionNode.unsetProcedure(regionNode.getProcedure());
980        }
981      } else {
982        // This is where we could throw an exception; i.e. override is false.
983        preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE);
984      }
985      assert regionNode.getProcedure() == null;
986      trsp =
987        TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo());
988      regionNode.setProcedure(trsp);
989    } catch (IOException ioe) {
990      // 'override' must be false here.
991      LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.",
992        ri.getEncodedName(), ioe);
993    } finally {
994      regionNode.unlock();
995    }
996    return trsp;
997  }
998
999  /**
1000   * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback
1001   * of caller is unable to do {@link #createAssignProcedures(Map)}.
1002   * <p/>
1003   * If no target server, at assign time, we will try to use the former location of the region if
1004   * one exists. This is how we 'retain' the old location across a server restart.
1005   * <p/>
1006   * Should only be called when you can make sure that no one can touch these regions other than
1007   * you. For example, when you are creating or enabling table. Presumes all Regions are in
1008   * appropriate state ripe for assign; no checking of Region state is done in here.
1009   * @see #createAssignProcedures(Map)
1010   */
1011  public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) {
1012    return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1013      .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare)
1014      .toArray(TransitRegionStateProcedure[]::new);
1015  }
1016
1017  /**
1018   * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run
1019   * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of
1020   * Region state is done in here.
1021   * @param assignments Map of assignments from which we produce an array of AssignProcedures.
1022   * @return Assignments made from the passed in <code>assignments</code>
1023   * @see #createAssignProcedures(List)
1024   */
1025  private TransitRegionStateProcedure[]
1026    createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) {
1027    return assignments.entrySet().stream()
1028      .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri))
1029        .map(regionNode -> createAssignProcedure(regionNode, e.getKey())))
1030      .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new);
1031  }
1032
1033  // for creating unassign TRSP when disabling a table or closing excess region replicas
1034  private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) {
1035    regionNode.lock();
1036    try {
1037      if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) {
1038        return null;
1039      }
1040      // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it
1041      // here for safety
1042      if (regionNode.getRegionInfo().isSplit()) {
1043        LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode);
1044        return null;
1045      }
1046      // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so
1047      // we can make sure that this procedure has not been executed yet, as TRSP will hold the
1048      // shared lock for table all the time. So here we will unset it and when it is actually
1049      // executed, it will find that the attach procedure is not itself and quit immediately.
1050      if (regionNode.getProcedure() != null) {
1051        regionNode.unsetProcedure(regionNode.getProcedure());
1052      }
1053      return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(),
1054        regionNode.getRegionInfo()));
1055    } finally {
1056      regionNode.unlock();
1057    }
1058  }
1059
1060  /**
1061   * Called by DisableTableProcedure to unassign all the regions for a table.
1062   */
1063  public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) {
1064    return regionStates.getTableRegionStateNodes(tableName).stream()
1065      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1066      .toArray(TransitRegionStateProcedure[]::new);
1067  }
1068
1069  /**
1070   * Called by ModifyTableProcedures to unassign all the excess region replicas for a table.
1071   */
1072  public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas(
1073    TableName tableName, int newReplicaCount) {
1074    return regionStates.getTableRegionStateNodes(tableName).stream()
1075      .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount)
1076      .map(this::forceCreateUnssignProcedure).filter(p -> p != null)
1077      .toArray(TransitRegionStateProcedure[]::new);
1078  }
1079
1080  public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit,
1081    final byte[] splitKey) throws IOException {
1082    return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey);
1083  }
1084
1085  public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate)
1086    throws IOException {
1087    return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate);
1088  }
1089
1090  public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException {
1091    return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false);
1092  }
1093
1094  /**
1095   * Delete the region states. This is called by "DeleteTable"
1096   */
1097  public void deleteTable(final TableName tableName) throws IOException {
1098    final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName);
1099    regionStateStore.deleteRegions(regions);
1100    for (int i = 0; i < regions.size(); ++i) {
1101      final RegionInfo regionInfo = regions.get(i);
1102      regionStates.deleteRegion(regionInfo);
1103    }
1104  }
1105
1106  // ============================================================================================
1107  // RS Region Transition Report helpers
1108  // ============================================================================================
1109  private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder,
1110    ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException {
1111    for (RegionStateTransition transition : transitionList) {
1112      switch (transition.getTransitionCode()) {
1113        case OPENED:
1114        case FAILED_OPEN:
1115        case CLOSED:
1116          assert transition.getRegionInfoCount() == 1 : transition;
1117          final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1118          long procId =
1119            transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID;
1120          updateRegionTransition(serverNode, transition.getTransitionCode(), hri,
1121            transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId);
1122          break;
1123        case READY_TO_SPLIT:
1124        case SPLIT:
1125        case SPLIT_REVERTED:
1126          assert transition.getRegionInfoCount() == 3 : transition;
1127          final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1128          final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1129          final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1130          updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA,
1131            splitB);
1132          break;
1133        case READY_TO_MERGE:
1134        case MERGED:
1135        case MERGE_REVERTED:
1136          assert transition.getRegionInfoCount() == 3 : transition;
1137          final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0));
1138          final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1));
1139          final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2));
1140          updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA,
1141            mergeB);
1142          break;
1143      }
1144    }
1145  }
1146
1147  public ReportRegionStateTransitionResponse reportRegionStateTransition(
1148    final ReportRegionStateTransitionRequest req) throws PleaseHoldException {
1149    ReportRegionStateTransitionResponse.Builder builder =
1150      ReportRegionStateTransitionResponse.newBuilder();
1151    ServerName serverName = ProtobufUtil.toServerName(req.getServer());
1152    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1153    if (serverNode == null) {
1154      LOG.warn("No server node for {}", serverName);
1155      builder.setErrorMessage("No server node for " + serverName);
1156      return builder.build();
1157    }
1158    // here we have to acquire a read lock instead of a simple exclusive lock. This is because that
1159    // we should not block other reportRegionStateTransition call from the same region server. This
1160    // is not only about performance, but also to prevent dead lock. Think of the meta region is
1161    // also on the same region server and you hold the lock which blocks the
1162    // reportRegionStateTransition for meta, and since meta is not online, you will block inside the
1163    // lock protection to wait for meta online...
1164    serverNode.readLock().lock();
1165    try {
1166      // we only accept reportRegionStateTransition if the region server is online, see the comment
1167      // above in submitServerCrash method and HBASE-21508 for more details.
1168      if (serverNode.isInState(ServerState.ONLINE)) {
1169        try {
1170          reportRegionStateTransition(builder, serverNode, req.getTransitionList());
1171        } catch (PleaseHoldException e) {
1172          LOG.trace("Failed transition ", e);
1173          throw e;
1174        } catch (UnsupportedOperationException | IOException e) {
1175          // TODO: at the moment we have a single error message and the RS will abort
1176          // if the master says that one of the region transitions failed.
1177          LOG.warn("Failed transition", e);
1178          builder.setErrorMessage("Failed transition " + e.getMessage());
1179        }
1180      } else {
1181        LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call",
1182          serverName);
1183        builder.setErrorMessage("You are dead");
1184      }
1185    } finally {
1186      serverNode.readLock().unlock();
1187    }
1188
1189    return builder.build();
1190  }
1191
1192  private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state,
1193    RegionInfo regionInfo, long seqId, long procId) throws IOException {
1194    checkMetaLoaded(regionInfo);
1195
1196    RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1197    if (regionNode == null) {
1198      // the table/region is gone. maybe a delete, split, merge
1199      throw new UnexpectedStateException(String.format(
1200        "Server %s was trying to transition region %s to %s. but Region is not known.",
1201        serverNode.getServerName(), regionInfo, state));
1202    }
1203    LOG.trace("Update region transition serverName={} region={} regionState={}",
1204      serverNode.getServerName(), regionNode, state);
1205
1206    regionNode.lock();
1207    try {
1208      if (!reportTransition(regionNode, serverNode, state, seqId, procId)) {
1209        // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages:
1210        // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for
1211        // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958,
1212        // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition
1213        // to CLOSED
1214        // These happen because on cluster shutdown, we currently let the RegionServers close
1215        // regions. This is the only time that region close is not run by the Master (so cluster
1216        // goes down fast). Consider changing it so Master runs all shutdowns.
1217        if (
1218          this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED)
1219        ) {
1220          LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName());
1221        } else {
1222          LOG.warn("No matching procedure found for {} transition on {} to {}",
1223            serverNode.getServerName(), regionNode, state);
1224        }
1225      }
1226    } finally {
1227      regionNode.unlock();
1228    }
1229  }
1230
1231  private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode,
1232    TransitionCode state, long seqId, long procId) throws IOException {
1233    ServerName serverName = serverNode.getServerName();
1234    TransitRegionStateProcedure proc = regionNode.getProcedure();
1235    if (proc == null) {
1236      return false;
1237    }
1238    proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode,
1239      serverName, state, seqId, procId);
1240    return true;
1241  }
1242
1243  private void updateRegionSplitTransition(final ServerStateNode serverNode,
1244    final TransitionCode state, final RegionInfo parent, final RegionInfo hriA,
1245    final RegionInfo hriB) throws IOException {
1246    checkMetaLoaded(parent);
1247
1248    if (state != TransitionCode.READY_TO_SPLIT) {
1249      throw new UnexpectedStateException(
1250        "unsupported split regionState=" + state + " for parent region " + parent
1251          + " maybe an old RS (< 2.0) had the operation in progress");
1252    }
1253
1254    // sanity check on the request
1255    if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) {
1256      throw new UnsupportedOperationException("unsupported split request with bad keys: parent="
1257        + parent + " hriA=" + hriA + " hriB=" + hriB);
1258    }
1259
1260    if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1261      LOG.warn("Split switch is off! skip split of " + parent);
1262      throw new DoNotRetryIOException(
1263        "Split region " + parent.getRegionNameAsString() + " failed due to split switch off");
1264    }
1265
1266    // Submit the Split procedure
1267    final byte[] splitKey = hriB.getStartKey();
1268    if (LOG.isDebugEnabled()) {
1269      LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent,
1270        Bytes.toStringBinary(splitKey));
1271    }
1272    // Processing this report happens asynchronously from other activities which can mutate
1273    // the region state. For example, a split procedure may already be running for this parent.
1274    // A split procedure cannot succeed if the parent region is no longer open, so we can
1275    // ignore it in that case.
1276    // Note that submitting more than one split procedure for a given region is
1277    // harmless -- the split is fenced in the procedure handling -- but it would be noisy in
1278    // the logs. Only one procedure can succeed. The other procedure(s) would abort during
1279    // initialization and report failure with WARN level logging.
1280    RegionState parentState = regionStates.getRegionState(parent);
1281    if (parentState != null && parentState.isOpened()) {
1282      master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey));
1283    } else {
1284      LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open",
1285        serverNode.getServerName(), parent);
1286      return;
1287    }
1288
1289    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split
1290    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1291      throw new UnsupportedOperationException(
1292        String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s",
1293          parent.getShortNameToLog(), hriA, hriB));
1294    }
1295  }
1296
1297  private void updateRegionMergeTransition(final ServerStateNode serverNode,
1298    final TransitionCode state, final RegionInfo merged, final RegionInfo hriA,
1299    final RegionInfo hriB) throws IOException {
1300    checkMetaLoaded(merged);
1301
1302    if (state != TransitionCode.READY_TO_MERGE) {
1303      throw new UnexpectedStateException(
1304        "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB
1305          + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress");
1306    }
1307
1308    if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1309      LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB);
1310      throw new DoNotRetryIOException(
1311        "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off");
1312    }
1313
1314    // Submit the Merge procedure
1315    if (LOG.isDebugEnabled()) {
1316      LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged);
1317    }
1318    master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB));
1319
1320    // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge
1321    if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) {
1322      throw new UnsupportedOperationException(
1323        String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state,
1324          merged, hriA, hriB));
1325    }
1326  }
1327
1328  // ============================================================================================
1329  // RS Status update (report online regions) helpers
1330  // ============================================================================================
1331  /**
1332   * The master will call this method when the RS send the regionServerReport(). The report will
1333   * contains the "online regions". This method will check the the online regions against the
1334   * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is
1335   * because that there is no fencing between the reportRegionStateTransition method and
1336   * regionServerReport method, so there could be race and introduce inconsistency here, but
1337   * actually there is no problem.
1338   * <p/>
1339   * Please see HBASE-21421 and HBASE-21463 for more details.
1340   */
1341  public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) {
1342    if (!isRunning()) {
1343      return;
1344    }
1345    if (LOG.isTraceEnabled()) {
1346      LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName,
1347        regionNames.size(), isMetaLoaded(),
1348        regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList()));
1349    }
1350
1351    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1352    if (serverNode == null) {
1353      LOG.warn("Got a report from server {} where its server node is null", serverName);
1354      return;
1355    }
1356    serverNode.readLock().lock();
1357    try {
1358      if (!serverNode.isInState(ServerState.ONLINE)) {
1359        LOG.warn("Got a report from a server result in state {}", serverNode);
1360        return;
1361      }
1362    } finally {
1363      serverNode.readLock().unlock();
1364    }
1365
1366    // Track the regionserver reported online regions in memory.
1367    synchronized (rsReports) {
1368      rsReports.put(serverName, regionNames);
1369    }
1370
1371    if (regionNames.isEmpty()) {
1372      // nothing to do if we don't have regions
1373      LOG.trace("no online region found on {}", serverName);
1374      return;
1375    }
1376    if (!isMetaLoaded()) {
1377      // we are still on startup, skip checking
1378      return;
1379    }
1380    // The Heartbeat tells us of what regions are on the region serve, check the state.
1381    checkOnlineRegionsReport(serverNode, regionNames);
1382  }
1383
1384  /**
1385   * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a
1386   * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not
1387   * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in
1388   * accounting.
1389   */
1390  private void closeRegionSilently(ServerName sn, byte[] regionName) {
1391    try {
1392      RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
1393      // Pass -1 for timeout. Means do not wait.
1394      ServerManager.closeRegionSilentlyAndWait(this.master.getClusterConnection(), sn, ri, -1);
1395    } catch (Exception e) {
1396      LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e);
1397    }
1398  }
1399
1400  /**
1401   * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we
1402   * will tell the RegionServer to expediently close a Region we do not think it should have.
1403   */
1404  private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) {
1405    ServerName serverName = serverNode.getServerName();
1406    for (byte[] regionName : regionNames) {
1407      if (!isRunning()) {
1408        return;
1409      }
1410      RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName);
1411      if (regionNode == null) {
1412        String regionNameAsStr = Bytes.toStringBinary(regionName);
1413        LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr,
1414          serverName);
1415        closeRegionSilently(serverNode.getServerName(), regionName);
1416        continue;
1417      }
1418      final long lag = 1000;
1419      // This is just a fallback check designed to identify unexpected data inconsistencies, so we
1420      // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the
1421      // check. This will not cause any additional problems and also prevents the regionServerReport
1422      // call from being stuck for too long which may cause deadlock on region assignment.
1423      if (regionNode.tryLock()) {
1424        try {
1425          long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate();
1426          if (regionNode.isInState(State.OPENING, State.OPEN)) {
1427            // This is possible as a region server has just closed a region but the region server
1428            // report is generated before the closing, but arrive after the closing. Make sure
1429            // there
1430            // is some elapsed time so less false alarms.
1431            if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) {
1432              LOG.warn("Reporting {} server does not match {} (time since last "
1433                + "update={}ms); closing...", serverName, regionNode, diff);
1434              closeRegionSilently(serverNode.getServerName(), regionName);
1435            }
1436          } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) {
1437            // So, we can get report that a region is CLOSED or SPLIT because a heartbeat
1438            // came in at about same time as a region transition. Make sure there is some
1439            // elapsed time so less false alarms.
1440            if (diff > lag) {
1441              LOG.warn("Reporting {} state does not match {} (time since last update={}ms)",
1442                serverName, regionNode, diff);
1443            }
1444          }
1445        } finally {
1446          regionNode.unlock();
1447        }
1448      } else {
1449        LOG.warn(
1450          "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.",
1451          regionNode);
1452      }
1453    }
1454  }
1455
1456  // ============================================================================================
1457  // RIT chore
1458  // ============================================================================================
1459  private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> {
1460    public RegionInTransitionChore(final int timeoutMsec) {
1461      super(timeoutMsec);
1462    }
1463
1464    @Override
1465    protected void periodicExecute(final MasterProcedureEnv env) {
1466      final AssignmentManager am = env.getAssignmentManager();
1467
1468      final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat();
1469      if (ritStat.hasRegionsOverThreshold()) {
1470        for (RegionState hri : ritStat.getRegionOverThreshold()) {
1471          am.handleRegionOverStuckWarningThreshold(hri.getRegion());
1472        }
1473      }
1474
1475      // update metrics
1476      am.updateRegionsInTransitionMetrics(ritStat);
1477    }
1478  }
1479
1480  private static class DeadServerMetricRegionChore
1481    extends ProcedureInMemoryChore<MasterProcedureEnv> {
1482    public DeadServerMetricRegionChore(final int timeoutMsec) {
1483      super(timeoutMsec);
1484    }
1485
1486    @Override
1487    protected void periodicExecute(final MasterProcedureEnv env) {
1488      final ServerManager sm = env.getMasterServices().getServerManager();
1489      final AssignmentManager am = env.getAssignmentManager();
1490      // To minimize inconsistencies we are not going to snapshot live servers in advance in case
1491      // new servers are added; OTOH we don't want to add heavy sync for a consistent view since
1492      // this is for metrics. Instead, we're going to check each regions as we go; to avoid making
1493      // too many checks, we maintain a local lists of server, limiting us to false negatives. If
1494      // we miss some recently-dead server, we'll just see it next time.
1495      Set<ServerName> recentlyLiveServers = new HashSet<>();
1496      int deadRegions = 0, unknownRegions = 0;
1497      for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) {
1498        if (rsn.getState() != State.OPEN) {
1499          continue; // Opportunistic check, should quickly skip RITs, offline tables, etc.
1500        }
1501        // Do not need to acquire region state lock as this is only for showing metrics.
1502        ServerName sn = rsn.getRegionLocation();
1503        State state = rsn.getState();
1504        if (state != State.OPEN) {
1505          continue; // Mostly skipping RITs that are already being take care of.
1506        }
1507        if (sn == null) {
1508          ++unknownRegions; // Opened on null?
1509          continue;
1510        }
1511        if (recentlyLiveServers.contains(sn)) {
1512          continue;
1513        }
1514        ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn);
1515        switch (sls) {
1516          case LIVE:
1517            recentlyLiveServers.add(sn);
1518            break;
1519          case DEAD:
1520            ++deadRegions;
1521            break;
1522          case UNKNOWN:
1523            ++unknownRegions;
1524            break;
1525          default:
1526            throw new AssertionError("Unexpected " + sls);
1527        }
1528      }
1529      if (deadRegions > 0 || unknownRegions > 0) {
1530        LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers",
1531          deadRegions, unknownRegions);
1532      }
1533
1534      am.updateDeadServerRegionMetrics(deadRegions, unknownRegions);
1535    }
1536  }
1537
1538  public RegionInTransitionStat computeRegionInTransitionStat() {
1539    final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration());
1540    rit.update(this);
1541    return rit;
1542  }
1543
1544  public static class RegionInTransitionStat {
1545    private final int ritThreshold;
1546
1547    private HashMap<String, RegionState> ritsOverThreshold = null;
1548    private long statTimestamp;
1549    private long oldestRITTime = 0;
1550    private int totalRITsTwiceThreshold = 0;
1551    private int totalRITs = 0;
1552
1553    public RegionInTransitionStat(final Configuration conf) {
1554      this.ritThreshold =
1555        conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD);
1556    }
1557
1558    public int getRITThreshold() {
1559      return ritThreshold;
1560    }
1561
1562    public long getTimestamp() {
1563      return statTimestamp;
1564    }
1565
1566    public int getTotalRITs() {
1567      return totalRITs;
1568    }
1569
1570    public long getOldestRITTime() {
1571      return oldestRITTime;
1572    }
1573
1574    public int getTotalRITsOverThreshold() {
1575      Map<String, RegionState> m = this.ritsOverThreshold;
1576      return m != null ? m.size() : 0;
1577    }
1578
1579    public boolean hasRegionsTwiceOverThreshold() {
1580      return totalRITsTwiceThreshold > 0;
1581    }
1582
1583    public boolean hasRegionsOverThreshold() {
1584      Map<String, RegionState> m = this.ritsOverThreshold;
1585      return m != null && !m.isEmpty();
1586    }
1587
1588    public Collection<RegionState> getRegionOverThreshold() {
1589      Map<String, RegionState> m = this.ritsOverThreshold;
1590      return m != null ? m.values() : Collections.emptySet();
1591    }
1592
1593    public boolean isRegionOverThreshold(final RegionInfo regionInfo) {
1594      Map<String, RegionState> m = this.ritsOverThreshold;
1595      return m != null && m.containsKey(regionInfo.getEncodedName());
1596    }
1597
1598    public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
1599      Map<String, RegionState> m = this.ritsOverThreshold;
1600      if (m == null) {
1601        return false;
1602      }
1603      final RegionState state = m.get(regionInfo.getEncodedName());
1604      if (state == null) {
1605        return false;
1606      }
1607      return (statTimestamp - state.getStamp()) > (ritThreshold * 2);
1608    }
1609
1610    protected void update(final AssignmentManager am) {
1611      final RegionStates regionStates = am.getRegionStates();
1612      this.statTimestamp = EnvironmentEdgeManager.currentTime();
1613      update(regionStates.getRegionsStateInTransition(), statTimestamp);
1614      update(regionStates.getRegionFailedOpen(), statTimestamp);
1615
1616      if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
1617        LOG.debug("RITs over threshold: {}",
1618          ritsOverThreshold.entrySet().stream()
1619            .map(e -> e.getKey() + ":" + e.getValue().getState().name())
1620            .collect(Collectors.joining("\n")));
1621      }
1622    }
1623
1624    private void update(final Collection<RegionState> regions, final long currentTime) {
1625      for (RegionState state : regions) {
1626        totalRITs++;
1627        final long ritTime = currentTime - state.getStamp();
1628        if (ritTime > ritThreshold) {
1629          if (ritsOverThreshold == null) {
1630            ritsOverThreshold = new HashMap<String, RegionState>();
1631          }
1632          ritsOverThreshold.put(state.getRegion().getEncodedName(), state);
1633          totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0;
1634        }
1635        if (oldestRITTime < ritTime) {
1636          oldestRITTime = ritTime;
1637        }
1638      }
1639    }
1640  }
1641
1642  private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) {
1643    metrics.updateRITOldestAge(ritStat.getOldestRITTime());
1644    metrics.updateRITCount(ritStat.getTotalRITs());
1645    metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold());
1646  }
1647
1648  private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) {
1649    metrics.updateDeadServerOpenRegions(deadRegions);
1650    metrics.updateUnknownServerOpenRegions(unknownRegions);
1651  }
1652
1653  private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) {
1654    final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo);
1655    // if (regionNode.isStuck()) {
1656    LOG.warn("STUCK Region-In-Transition {}", regionNode);
1657  }
1658
1659  // ============================================================================================
1660  // TODO: Master load/bootstrap
1661  // ============================================================================================
1662  public void joinCluster() throws IOException {
1663    long startTime = System.nanoTime();
1664    LOG.debug("Joining cluster...");
1665
1666    // Scan hbase:meta to build list of existing regions, servers, and assignment.
1667    // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress
1668    // w/o meta.
1669    loadMeta();
1670
1671    while (master.getServerManager().countOfRegionServers() < 1) {
1672      LOG.info("Waiting for RegionServers to join; current count={}",
1673        master.getServerManager().countOfRegionServers());
1674      Threads.sleep(250);
1675    }
1676    LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers());
1677
1678    // Start the chores
1679    master.getMasterProcedureExecutor().addChore(this.ritChore);
1680    master.getMasterProcedureExecutor().addChore(this.deadMetricChore);
1681
1682    long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
1683    LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs));
1684  }
1685
1686  /**
1687   * Create assign procedure for offline regions. Just follow the old
1688   * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead
1689   * server any more, we only deal with the regions in OFFLINE state in this method. And this is a
1690   * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and
1691   * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is
1692   * a legacy from long ago, as things are going really different now, maybe we do not need this
1693   * method any more. Need to revisit later.
1694   */
1695  // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
1696  // Needs to be done after the table state manager has been started.
1697  public void processOfflineRegions() {
1698    TransitRegionStateProcedure[] procs =
1699      regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
1700        .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> {
1701          rsn.lock();
1702          try {
1703            if (rsn.getProcedure() != null) {
1704              return null;
1705            } else {
1706              return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(),
1707                rsn.getRegionInfo(), null));
1708            }
1709          } finally {
1710            rsn.unlock();
1711          }
1712        }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new);
1713    if (procs.length > 0) {
1714      master.getMasterProcedureExecutor().submitProcedures(procs);
1715    }
1716  }
1717
1718  /*
1719   * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META
1720   * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will
1721   * convert Result into proper RegionInfo instances, but those would still need to be added into
1722   * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState
1723   * method provides the logic for adding RegionInfo instances as loaded from latest META scan into
1724   * AssignmentManager.regionStates.
1725   */
1726  private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor {
1727
1728    @Override
1729    public void visitRegionState(Result result, final RegionInfo regionInfo, final State state,
1730      final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) {
1731      if (
1732        state == null && regionLocation == null && lastHost == null
1733          && openSeqNum == SequenceId.NO_SEQUENCE_ID
1734      ) {
1735        // This is a row with nothing in it.
1736        LOG.warn("Skipping empty row={}", result);
1737        return;
1738      }
1739      State localState = state;
1740      if (localState == null) {
1741        // No region state column data in hbase:meta table! Are I doing a rolling upgrade from
1742        // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta?
1743        // In any of these cases, state is empty. For now, presume OFFLINE but there are probably
1744        // cases where we need to probe more to be sure this correct; TODO informed by experience.
1745        LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE);
1746        localState = State.OFFLINE;
1747      }
1748      RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo);
1749      // Do not need to lock on regionNode, as we can make sure that before we finish loading
1750      // meta, all the related procedures can not be executed. The only exception is for meta
1751      // region related operations, but here we do not load the informations for meta region.
1752      regionNode.setState(localState);
1753      regionNode.setLastHost(lastHost);
1754      regionNode.setRegionLocation(regionLocation);
1755      regionNode.setOpenSeqNum(openSeqNum);
1756
1757      // Note: keep consistent with other methods, see region(Opening|Opened|Closing)
1758      // RIT/ServerCrash handling should take care of the transiting regions.
1759      if (
1760        localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING)
1761      ) {
1762        assert regionLocation != null : "found null region location for " + regionNode;
1763        // TODO: this could lead to some orphan server state nodes, as it is possible that the
1764        // region server is already dead and its SCP has already finished but we have
1765        // persisted an opening state on this region server. Finally the TRSP will assign the
1766        // region to another region server, so it will not cause critical problems, just waste
1767        // some memory as no one will try to cleanup these orphan server state nodes.
1768        regionStates.createServer(regionLocation);
1769        regionStates.addRegionToServer(regionNode);
1770      } else if (localState == State.OFFLINE || regionInfo.isOffline()) {
1771        regionStates.addToOfflineRegions(regionNode);
1772      }
1773      if (regionNode.getProcedure() != null) {
1774        regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
1775      }
1776    }
1777  };
1778
1779  /**
1780   * Attempt to load {@code regionInfo} from META, adding any results to the
1781   * {@link #regionStateStore} Is NOT aware of replica regions.
1782   * @param regionInfo the region to be loaded from META.
1783   * @throws IOException If some error occurs while querying META or parsing results.
1784   */
1785  public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo)
1786    throws IOException {
1787    final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId()
1788      ? regionInfo.getEncodedName()
1789      : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build()
1790        .getEncodedName();
1791    populateRegionStatesFromMeta(regionEncodedName);
1792  }
1793
1794  /**
1795   * Attempt to load {@code regionEncodedName} from META, adding any results to the
1796   * {@link #regionStateStore} Is NOT aware of replica regions.
1797   * @param regionEncodedName encoded name for the region to be loaded from META.
1798   * @throws IOException If some error occurs while querying META or parsing results.
1799   */
1800  public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException {
1801    final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor();
1802    regionStateStore.visitMetaForRegion(regionEncodedName, visitor);
1803  }
1804
1805  private void loadMeta() throws IOException {
1806    // TODO: use a thread pool
1807    regionStateStore.visitMeta(new RegionMetaLoadingVisitor());
1808  }
1809
1810  /**
1811   * Used to check if the meta loading is done.
1812   * <p/>
1813   * if not we throw PleaseHoldException since we are rebuilding the RegionStates
1814   * @param hri region to check if it is already rebuild
1815   * @throws PleaseHoldException if meta has not been loaded yet
1816   */
1817  private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException {
1818    if (!isRunning()) {
1819      throw new PleaseHoldException("AssignmentManager not running");
1820    }
1821    boolean meta = isMetaRegion(hri);
1822    boolean metaLoaded = isMetaLoaded();
1823    if (!meta && !metaLoaded) {
1824      throw new PleaseHoldException(
1825        "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded);
1826    }
1827  }
1828
1829  // ============================================================================================
1830  // TODO: Metrics
1831  // ============================================================================================
1832  public int getNumRegionsOpened() {
1833    // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value
1834    return 0;
1835  }
1836
1837  /**
1838   * Usually run by the Master in reaction to server crash during normal processing. Can also be
1839   * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we
1840   * push through the SCP though context may indicate already-running-SCP (An old SCP may have
1841   * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown
1842   * Servers' -- servers that are not online or in dead servers list, etc.)
1843   * @param force Set if the request came in externally over RPC (via hbck2). Force means run the
1844   *              SCP even if it seems as though there might be an outstanding SCP running.
1845   * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled.
1846   */
1847  public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) {
1848    // May be an 'Unknown Server' so handle case where serverNode is null.
1849    ServerStateNode serverNode = regionStates.getServerNode(serverName);
1850    // Remove the in-memory rsReports result
1851    synchronized (rsReports) {
1852      rsReports.remove(serverName);
1853    }
1854    if (serverNode == null) {
1855      if (force) {
1856        LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName);
1857      } else {
1858        // for normal case, do not schedule SCP if ServerStateNode is null
1859        LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName);
1860        return Procedure.NO_PROC_ID;
1861      }
1862    }
1863
1864    ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor();
1865    // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the
1866    // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from
1867    // this server. This is used to simplify the implementation for TRSP and SCP, where we can make
1868    // sure that, the region list fetched by SCP will not be changed any more.
1869    if (serverNode != null) {
1870      serverNode.writeLock().lock();
1871    }
1872    try {
1873
1874      boolean carryingMeta = isCarryingMeta(serverName);
1875      if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) {
1876        if (force) {
1877          LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1878            serverNode, carryingMeta, ServerState.ONLINE);
1879        } else {
1880          LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}",
1881            serverNode, carryingMeta, ServerState.ONLINE);
1882          return Procedure.NO_PROC_ID;
1883        }
1884      }
1885      MasterProcedureEnv mpe = procExec.getEnvironment();
1886      // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead.
1887      // HBCKSCP scours Master in-memory state AND hbase;meta for references to
1888      // serverName just-in-case. An SCP that is scheduled when the server is
1889      // 'Unknown' probably originated externally with HBCK2 fix-it tool.
1890      ServerState oldState = null;
1891      if (serverNode != null) {
1892        oldState = serverNode.getState();
1893        serverNode.setState(ServerState.CRASHED);
1894      }
1895      ServerCrashProcedure scp = force
1896        ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta)
1897        : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta);
1898      long pid = procExec.submitProcedure(scp);
1899      LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName,
1900        carryingMeta,
1901        serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState);
1902      return pid;
1903    } finally {
1904      if (serverNode != null) {
1905        serverNode.writeLock().unlock();
1906      }
1907    }
1908  }
1909
1910  public void offlineRegion(final RegionInfo regionInfo) {
1911    // TODO used by MasterRpcServices
1912    RegionStateNode node = regionStates.getRegionStateNode(regionInfo);
1913    if (node != null) {
1914      node.offline();
1915    }
1916  }
1917
1918  public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) {
1919    // TODO used by TestSplitTransactionOnCluster.java
1920  }
1921
1922  public Map<ServerName, List<RegionInfo>>
1923    getSnapShotOfAssignment(final Collection<RegionInfo> regions) {
1924    return regionStates.getSnapShotOfAssignment(regions);
1925  }
1926
1927  // ============================================================================================
1928  // TODO: UTILS/HELPERS?
1929  // ============================================================================================
1930  /**
1931   * Used by the client (via master) to identify if all regions have the schema updates
1932   * @return Pair indicating the status of the alter command (pending/total)
1933   */
1934  public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
1935    if (isTableDisabled(tableName)) {
1936      return new Pair<Integer, Integer>(0, 0);
1937    }
1938
1939    final List<RegionState> states = regionStates.getTableRegionStates(tableName);
1940    int ritCount = 0;
1941    for (RegionState regionState : states) {
1942      if (!regionState.isOpened() && !regionState.isSplit()) {
1943        ritCount++;
1944      }
1945    }
1946    return new Pair<Integer, Integer>(ritCount, states.size());
1947  }
1948
1949  // ============================================================================================
1950  // TODO: Region State In Transition
1951  // ============================================================================================
1952  public boolean hasRegionsInTransition() {
1953    return regionStates.hasRegionsInTransition();
1954  }
1955
1956  public List<RegionStateNode> getRegionsInTransition() {
1957    return regionStates.getRegionsInTransition();
1958  }
1959
1960  public List<RegionInfo> getAssignedRegions() {
1961    return regionStates.getAssignedRegions();
1962  }
1963
1964  /**
1965   * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}.
1966   */
1967  public RegionInfo getRegionInfo(final byte[] regionName) {
1968    final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName);
1969    return regionState != null ? regionState.getRegionInfo() : null;
1970  }
1971
1972  /**
1973   * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}.
1974   */
1975  public RegionInfo getRegionInfo(final String encodedRegionName) {
1976    final RegionStateNode regionState =
1977      regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName);
1978    return regionState != null ? regionState.getRegionInfo() : null;
1979  }
1980
1981  // ============================================================================================
1982  // Expected states on region state transition.
1983  // Notice that there is expected states for transiting to OPENING state, this is because SCP.
1984  // See the comments in regionOpening method for more details.
1985  // ============================================================================================
1986  private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case
1987    State.OPEN // Retrying
1988  };
1989
1990  private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case
1991    State.CLOSING, // Retrying
1992    State.SPLITTING, // Offline the split parent
1993    State.MERGING // Offline the merge parents
1994  };
1995
1996  private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case
1997    State.CLOSED // Retrying
1998  };
1999
2000  // This is for manually scheduled region assign, can add other states later if we find out other
2001  // usages
2002  private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE };
2003
2004  // We only allow unassign or move a region which is in OPEN state.
2005  private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN };
2006
2007  // ============================================================================================
2008  // Region Status update
2009  // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
2010  // and pre-assumptions are very tricky.
2011  // ============================================================================================
2012  private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
2013    RegionState.State... expectedStates) throws IOException {
2014    RegionState.State state = regionNode.getState();
2015    regionNode.transitionState(newState, expectedStates);
2016    boolean succ = false;
2017    try {
2018      regionStateStore.updateRegionLocation(regionNode);
2019      succ = true;
2020    } finally {
2021      if (!succ) {
2022        // revert
2023        regionNode.setState(state);
2024      }
2025    }
2026  }
2027
2028  // should be called within the synchronized block of RegionStateNode
2029  void regionOpening(RegionStateNode regionNode) throws IOException {
2030    // As in SCP, for performance reason, there is no TRSP attached with this region, we will not
2031    // update the region state, which means that the region could be in any state when we want to
2032    // assign it after a RS crash. So here we do not pass the expectedStates parameter.
2033    transitStateAndUpdate(regionNode, State.OPENING);
2034    ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation());
2035    // Here the server node could be null. For example, we want to assign the region to a given
2036    // region server and it crashes, and it is the region server which holds hbase:meta, then the
2037    // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But
2038    // after the SCP finishes, the server node will be removed, so when we arrive there, the
2039    // server node will be null. This is not a big problem if we skip adding it, as later we will
2040    // fail to execute the remote procedure on the region server and then try to assign to another
2041    // region server
2042    if (serverNode != null) {
2043      serverNode.addRegion(regionNode);
2044    }
2045    // update the operation count metrics
2046    metrics.incrementOperationCounter();
2047  }
2048
2049  // should be called under the RegionStateNode lock
2050  // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
2051  // we will persist the FAILED_OPEN state into hbase:meta.
2052  void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
2053    RegionState.State state = regionNode.getState();
2054    ServerName regionLocation = regionNode.getRegionLocation();
2055    if (giveUp) {
2056      regionNode.setState(State.FAILED_OPEN);
2057      regionNode.setRegionLocation(null);
2058      boolean succ = false;
2059      try {
2060        regionStateStore.updateRegionLocation(regionNode);
2061        succ = true;
2062      } finally {
2063        if (!succ) {
2064          // revert
2065          regionNode.setState(state);
2066          regionNode.setRegionLocation(regionLocation);
2067        }
2068      }
2069    }
2070    if (regionLocation != null) {
2071      regionStates.removeRegionFromServer(regionLocation, regionNode);
2072    }
2073  }
2074
2075  // should be called under the RegionStateNode lock
2076  void regionClosing(RegionStateNode regionNode) throws IOException {
2077    transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);
2078
2079    RegionInfo hri = regionNode.getRegionInfo();
2080    // Set meta has not initialized early. so people trying to create/edit tables will wait
2081    if (isMetaRegion(hri)) {
2082      setMetaAssigned(hri, false);
2083    }
2084    // update the operation count metrics
2085    metrics.incrementOperationCounter();
2086  }
2087
2088  // for open and close, they will first be persist to the procedure store in
2089  // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered
2090  // as succeeded if the persistence to procedure store is succeeded, and then when the
2091  // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.
2092
2093  // should be called under the RegionStateNode lock
2094  void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
2095    regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
2096    RegionInfo regionInfo = regionNode.getRegionInfo();
2097    regionStates.addRegionToServer(regionNode);
2098    regionStates.removeFromFailedOpen(regionInfo);
2099  }
2100
2101  // should be called under the RegionStateNode lock
2102  void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
2103    ServerName regionLocation = regionNode.getRegionLocation();
2104    regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
2105    regionNode.setRegionLocation(null);
2106    if (regionLocation != null) {
2107      regionNode.setLastHost(regionLocation);
2108      regionStates.removeRegionFromServer(regionLocation, regionNode);
2109    }
2110  }
2111
2112  // should be called under the RegionStateNode lock
2113  // for SCP
2114  public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
2115    RegionState.State state = regionNode.getState();
2116    ServerName regionLocation = regionNode.getRegionLocation();
2117    regionNode.transitionState(State.ABNORMALLY_CLOSED);
2118    regionNode.setRegionLocation(null);
2119    boolean succ = false;
2120    try {
2121      regionStateStore.updateRegionLocation(regionNode);
2122      succ = true;
2123    } finally {
2124      if (!succ) {
2125        // revert
2126        regionNode.setState(state);
2127        regionNode.setRegionLocation(regionLocation);
2128      }
2129    }
2130    if (regionLocation != null) {
2131      regionNode.setLastHost(regionLocation);
2132      regionStates.removeRegionFromServer(regionLocation, regionNode);
2133    }
2134  }
2135
2136  void persistToMeta(RegionStateNode regionNode) throws IOException {
2137    regionStateStore.updateRegionLocation(regionNode);
2138    RegionInfo regionInfo = regionNode.getRegionInfo();
2139    if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
2140      // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
2141      // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
2142      // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
2143      // on table that contains state.
2144      setMetaAssigned(regionInfo, true);
2145    }
2146  }
2147
2148  // ============================================================================================
2149  // The above methods can only be called in TransitRegionStateProcedure(and related procedures)
2150  // ============================================================================================
2151
2152  public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
2153    final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
2154    // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
2155    // The parent stays in regionStates until cleared when removed by CatalogJanitor.
2156    // Update its state in regionStates to it shows as offline and split when read
2157    // later figuring what regions are in a table and what are not: see
2158    // regionStates#getRegionsOfTable
2159    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent);
2160    node.setState(State.SPLIT);
2161    final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA);
2162    nodeA.setState(State.SPLITTING_NEW);
2163    final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB);
2164    nodeB.setState(State.SPLITTING_NEW);
2165
2166    // TODO: here we just update the parent region info in meta, to set split and offline to true,
2167    // without changing the one in the region node. This is a bit confusing but the region info
2168    // field in RegionStateNode is not expected to be changed in the current design. Need to find a
2169    // possible way to address this problem, or at least adding more comments about the trick to
2170    // deal with this problem, that when you want to filter out split parent, you need to check both
2171    // the RegionState on whether it is split, and also the region info. If one of them matches then
2172    // it is a split parent. And usually only one of them can match, as after restart, the region
2173    // state will be changed from SPLIT to CLOSED.
2174    regionStateStore.splitRegion(parent, daughterA, daughterB, serverName);
2175    if (shouldAssignFavoredNodes(parent)) {
2176      List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList();
2177      ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForDaughter(onlineServers, parent,
2178        daughterA, daughterB);
2179    }
2180  }
2181
2182  /**
2183   * When called here, the merge has happened. The merged regions have been unassigned and the above
2184   * markRegionClosed has been called on each so they have been disassociated from a hosting Server.
2185   * The merged region will be open after this call. The merged regions are removed from hbase:meta
2186   * below. Later they are deleted from the filesystem by the catalog janitor running against
2187   * hbase:meta. It notices when the merged region no longer holds references to the old regions
2188   * (References are deleted after a compaction rewrites what the Reference points at but not until
2189   * the archiver chore runs, are the References removed).
2190   */
2191  public void markRegionAsMerged(final RegionInfo child, final ServerName serverName,
2192    RegionInfo[] mergeParents) throws IOException {
2193    final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child);
2194    node.setState(State.MERGED);
2195    for (RegionInfo ri : mergeParents) {
2196      regionStates.deleteRegion(ri);
2197
2198    }
2199    regionStateStore.mergeRegions(child, mergeParents, serverName);
2200    if (shouldAssignFavoredNodes(child)) {
2201      ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForMergedRegion(child,
2202        mergeParents);
2203    }
2204  }
2205
2206  /*
2207   * Favored nodes should be applied only when FavoredNodes balancer is configured and the region
2208   * belongs to a non-system table.
2209   */
2210  private boolean shouldAssignFavoredNodes(RegionInfo region) {
2211    return this.shouldAssignRegionsWithFavoredNodes
2212      && FavoredNodesManager.isFavoredNodeApplicable(region);
2213  }
2214
2215  // ============================================================================================
2216  // Assign Queue (Assign/Balance)
2217  // ============================================================================================
2218  private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>();
2219  private final ReentrantLock assignQueueLock = new ReentrantLock();
2220  private final Condition assignQueueFullCond = assignQueueLock.newCondition();
2221
2222  /**
2223   * Add the assign operation to the assignment queue. The pending assignment operation will be
2224   * processed, and each region will be assigned by a server using the balancer.
2225   */
2226  protected void queueAssign(final RegionStateNode regionNode) {
2227    regionNode.getProcedureEvent().suspend();
2228
2229    // TODO: quick-start for meta and the other sys-tables?
2230    assignQueueLock.lock();
2231    try {
2232      pendingAssignQueue.add(regionNode);
2233      if (
2234        regionNode.isSystemTable() || pendingAssignQueue.size() == 1
2235          || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize
2236      ) {
2237        assignQueueFullCond.signal();
2238      }
2239    } finally {
2240      assignQueueLock.unlock();
2241    }
2242  }
2243
2244  private void startAssignmentThread() {
2245    assignThread = new Thread(master.getServerName().toShortString()) {
2246      @Override
2247      public void run() {
2248        while (isRunning()) {
2249          processAssignQueue();
2250        }
2251        pendingAssignQueue.clear();
2252      }
2253    };
2254    assignThread.setDaemon(true);
2255    assignThread.start();
2256  }
2257
2258  private void stopAssignmentThread() {
2259    assignQueueSignal();
2260    try {
2261      while (assignThread.isAlive()) {
2262        assignQueueSignal();
2263        assignThread.join(250);
2264      }
2265    } catch (InterruptedException e) {
2266      LOG.warn("join interrupted", e);
2267      Thread.currentThread().interrupt();
2268    }
2269  }
2270
2271  private void assignQueueSignal() {
2272    assignQueueLock.lock();
2273    try {
2274      assignQueueFullCond.signal();
2275    } finally {
2276      assignQueueLock.unlock();
2277    }
2278  }
2279
2280  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
2281  private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() {
2282    HashMap<RegionInfo, RegionStateNode> regions = null;
2283
2284    assignQueueLock.lock();
2285    try {
2286      if (pendingAssignQueue.isEmpty() && isRunning()) {
2287        assignQueueFullCond.await();
2288      }
2289
2290      if (!isRunning()) {
2291        return null;
2292      }
2293      assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS);
2294      regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size());
2295      for (RegionStateNode regionNode : pendingAssignQueue) {
2296        regions.put(regionNode.getRegionInfo(), regionNode);
2297      }
2298      pendingAssignQueue.clear();
2299    } catch (InterruptedException e) {
2300      LOG.warn("got interrupted ", e);
2301      Thread.currentThread().interrupt();
2302    } finally {
2303      assignQueueLock.unlock();
2304    }
2305    return regions;
2306  }
2307
2308  private void processAssignQueue() {
2309    final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue();
2310    if (regions == null || regions.size() == 0 || !isRunning()) {
2311      return;
2312    }
2313
2314    if (LOG.isTraceEnabled()) {
2315      LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size());
2316    }
2317
2318    // TODO: Optimize balancer. pass a RegionPlan?
2319    final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>();
2320    final List<RegionInfo> userHRIs = new ArrayList<>(regions.size());
2321    // Regions for system tables requiring reassignment
2322    final List<RegionInfo> systemHRIs = new ArrayList<>();
2323    for (RegionStateNode regionStateNode : regions.values()) {
2324      boolean sysTable = regionStateNode.isSystemTable();
2325      final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs;
2326      if (regionStateNode.getRegionLocation() != null) {
2327        retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation());
2328      } else {
2329        hris.add(regionStateNode.getRegionInfo());
2330      }
2331    }
2332
2333    // TODO: connect with the listener to invalidate the cache
2334
2335    // TODO use events
2336    List<ServerName> servers = master.getServerManager().createDestinationServersList();
2337    for (int i = 0; servers.size() < 1; ++i) {
2338      // Report every fourth time around this loop; try not to flood log.
2339      if (i % 4 == 0) {
2340        LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions.");
2341      }
2342
2343      if (!isRunning()) {
2344        LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions.");
2345        return;
2346      }
2347      Threads.sleep(250);
2348      servers = master.getServerManager().createDestinationServersList();
2349    }
2350
2351    if (!systemHRIs.isEmpty()) {
2352      // System table regions requiring reassignment are present, get region servers
2353      // not available for system table regions
2354      final List<ServerName> excludeServers = getExcludedServersForSystemTable();
2355      List<ServerName> serversForSysTables =
2356        servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList());
2357      if (serversForSysTables.isEmpty()) {
2358        LOG.warn("Filtering old server versions and the excluded produced an empty set; "
2359          + "instead considering all candidate servers!");
2360      }
2361      LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size()
2362        + ", allServersCount=" + servers.size());
2363      processAssignmentPlans(regions, null, systemHRIs,
2364        serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs)
2365          ? servers
2366          : serversForSysTables);
2367    }
2368
2369    processAssignmentPlans(regions, retainMap, userHRIs, servers);
2370  }
2371
2372  private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions,
2373    List<RegionInfo> hirs) {
2374    for (RegionInfo ri : hirs) {
2375      if (
2376        regions.get(ri).getRegionLocation() != null
2377          && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME)
2378      ) {
2379        return true;
2380      }
2381    }
2382    return false;
2383  }
2384
2385  private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions,
2386    final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris,
2387    final List<ServerName> servers) {
2388    boolean isTraceEnabled = LOG.isTraceEnabled();
2389    if (isTraceEnabled) {
2390      LOG.trace("Available servers count=" + servers.size() + ": " + servers);
2391    }
2392
2393    final LoadBalancer balancer = getBalancer();
2394    // ask the balancer where to place regions
2395    if (retainMap != null && !retainMap.isEmpty()) {
2396      if (isTraceEnabled) {
2397        LOG.trace("retain assign regions=" + retainMap);
2398      }
2399      try {
2400        acceptPlan(regions, balancer.retainAssignment(retainMap, servers));
2401      } catch (HBaseIOException e) {
2402        LOG.warn("unable to retain assignment", e);
2403        addToPendingAssignment(regions, retainMap.keySet());
2404      }
2405    }
2406
2407    // TODO: Do we need to split retain and round-robin?
2408    // the retain seems to fallback to round-robin/random if the region is not in the map.
2409    if (!hris.isEmpty()) {
2410      Collections.sort(hris, RegionInfo.COMPARATOR);
2411      if (isTraceEnabled) {
2412        LOG.trace("round robin regions=" + hris);
2413      }
2414      try {
2415        acceptPlan(regions, balancer.roundRobinAssignment(hris, servers));
2416      } catch (HBaseIOException e) {
2417        LOG.warn("unable to round-robin assignment", e);
2418        addToPendingAssignment(regions, hris);
2419      }
2420    }
2421  }
2422
2423  private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions,
2424    final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException {
2425    final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()];
2426    final long st = EnvironmentEdgeManager.currentTime();
2427
2428    if (plan.isEmpty()) {
2429      throw new HBaseIOException("unable to compute plans for regions=" + regions.size());
2430    }
2431
2432    int evcount = 0;
2433    for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) {
2434      final ServerName server = entry.getKey();
2435      for (RegionInfo hri : entry.getValue()) {
2436        final RegionStateNode regionNode = regions.get(hri);
2437        regionNode.setRegionLocation(server);
2438        if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) {
2439          assignQueueLock.lock();
2440          try {
2441            pendingAssignQueue.add(regionNode);
2442          } finally {
2443            assignQueueLock.unlock();
2444          }
2445        } else {
2446          events[evcount++] = regionNode.getProcedureEvent();
2447        }
2448      }
2449    }
2450    ProcedureEvent.wakeEvents(getProcedureScheduler(), events);
2451
2452    final long et = EnvironmentEdgeManager.currentTime();
2453    if (LOG.isTraceEnabled()) {
2454      LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st));
2455    }
2456  }
2457
2458  private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions,
2459    final Collection<RegionInfo> pendingRegions) {
2460    assignQueueLock.lock();
2461    try {
2462      for (RegionInfo hri : pendingRegions) {
2463        pendingAssignQueue.add(regions.get(hri));
2464      }
2465    } finally {
2466      assignQueueLock.unlock();
2467    }
2468  }
2469
2470  /**
2471   * For a given cluster with mixed versions of servers, get a list of servers with lower versions,
2472   * where system table regions should not be assigned to. For system table, we must assign regions
2473   * to a server with highest version. However, we can disable this exclusion using config:
2474   * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation
2475   * available with definition of minVersionToMoveSysTables.
2476   * @return List of Excluded servers for System table regions.
2477   */
2478  public List<ServerName> getExcludedServersForSystemTable() {
2479    // TODO: This should be a cached list kept by the ServerManager rather than calculated on each
2480    // move or system region assign. The RegionServerTracker keeps list of online Servers with
2481    // RegionServerInfo that includes Version.
2482    List<Pair<ServerName, String>> serverList =
2483      master.getServerManager().getOnlineServersList().stream()
2484        .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList());
2485    if (serverList.isEmpty()) {
2486      return new ArrayList<>();
2487    }
2488    String highestVersion = Collections
2489      .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond()))
2490      .getSecond();
2491    if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) {
2492      int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion);
2493      if (comparedValue > 0) {
2494        return new ArrayList<>();
2495      }
2496    }
2497    return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion))
2498      .map(Pair::getFirst).collect(Collectors.toList());
2499  }
2500
2501  MasterServices getMaster() {
2502    return master;
2503  }
2504
2505  /** Returns a snapshot of rsReports */
2506  public Map<ServerName, Set<byte[]>> getRSReports() {
2507    Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>();
2508    synchronized (rsReports) {
2509      rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue()));
2510    }
2511    return rsReportsSnapshot;
2512  }
2513
2514  /**
2515   * Provide regions state count for given table. e.g howmany regions of give table are
2516   * opened/closed/rit etc
2517   * @param tableName TableName
2518   * @return region states count
2519   */
2520  public RegionStatesCount getRegionStatesCount(TableName tableName) {
2521    int openRegionsCount = 0;
2522    int closedRegionCount = 0;
2523    int ritCount = 0;
2524    int splitRegionCount = 0;
2525    int totalRegionCount = 0;
2526    if (!isTableDisabled(tableName)) {
2527      final List<RegionState> states = regionStates.getTableRegionStates(tableName);
2528      for (RegionState regionState : states) {
2529        if (regionState.isOpened()) {
2530          openRegionsCount++;
2531        } else if (regionState.isClosed()) {
2532          closedRegionCount++;
2533        } else if (regionState.isSplit()) {
2534          splitRegionCount++;
2535        }
2536      }
2537      totalRegionCount = states.size();
2538      ritCount = totalRegionCount - openRegionsCount - splitRegionCount;
2539    }
2540    return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount)
2541      .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount)
2542      .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build();
2543  }
2544
2545}