001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.Future; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.locks.Condition; 034import java.util.concurrent.locks.ReentrantLock; 035import java.util.stream.Collectors; 036import java.util.stream.Stream; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.DoNotRetryIOException; 039import org.apache.hadoop.hbase.HBaseIOException; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.MetaTableAccessor; 042import org.apache.hadoop.hbase.PleaseHoldException; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.UnknownRegionException; 046import org.apache.hadoop.hbase.client.DoNotRetryRegionException; 047import org.apache.hadoop.hbase.client.MasterSwitchType; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.client.RegionReplicaUtil; 051import org.apache.hadoop.hbase.client.RegionStatesCount; 052import org.apache.hadoop.hbase.client.Result; 053import org.apache.hadoop.hbase.client.ResultScanner; 054import org.apache.hadoop.hbase.client.Scan; 055import org.apache.hadoop.hbase.client.TableState; 056import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; 057import org.apache.hadoop.hbase.favored.FavoredNodesManager; 058import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; 059import org.apache.hadoop.hbase.master.LoadBalancer; 060import org.apache.hadoop.hbase.master.MasterServices; 061import org.apache.hadoop.hbase.master.MetricsAssignmentManager; 062import org.apache.hadoop.hbase.master.RegionPlan; 063import org.apache.hadoop.hbase.master.RegionState; 064import org.apache.hadoop.hbase.master.RegionState.State; 065import org.apache.hadoop.hbase.master.ServerManager; 066import org.apache.hadoop.hbase.master.TableStateManager; 067import org.apache.hadoop.hbase.master.balancer.FavoredStochasticBalancer; 068import org.apache.hadoop.hbase.master.procedure.HBCKServerCrashProcedure; 069import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 070import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 071import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 072import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 073import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure; 074import org.apache.hadoop.hbase.master.region.MasterRegion; 075import org.apache.hadoop.hbase.procedure2.Procedure; 076import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 077import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 078import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore; 079import org.apache.hadoop.hbase.procedure2.util.StringUtils; 080import org.apache.hadoop.hbase.regionserver.SequenceId; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 083import org.apache.hadoop.hbase.util.Pair; 084import org.apache.hadoop.hbase.util.Threads; 085import org.apache.hadoop.hbase.util.VersionInfo; 086import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 087import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 088import org.apache.yetus.audience.InterfaceAudience; 089import org.apache.zookeeper.KeeperException; 090import org.slf4j.Logger; 091import org.slf4j.LoggerFactory; 092 093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; 098 099/** 100 * The AssignmentManager is the coordinator for region assign/unassign operations. 101 * <ul> 102 * <li>In-memory states of regions and servers are stored in {@link RegionStates}.</li> 103 * <li>hbase:meta state updates are handled by {@link RegionStateStore}.</li> 104 * </ul> 105 * Regions are created by CreateTable, Split, Merge. Regions are deleted by DeleteTable, Split, 106 * Merge. Assigns are triggered by CreateTable, EnableTable, Split, Merge, ServerCrash. Unassigns 107 * are triggered by DisableTable, Split, Merge 108 */ 109@InterfaceAudience.Private 110public class AssignmentManager { 111 private static final Logger LOG = LoggerFactory.getLogger(AssignmentManager.class); 112 113 // TODO: AMv2 114 // - handle region migration from hbase1 to hbase2. 115 // - handle sys table assignment first (e.g. acl, namespace) 116 // - handle table priorities 117 // - If ServerBusyException trying to update hbase:meta, we abort the Master 118 // See updateRegionLocation in RegionStateStore. 119 // 120 // See also 121 // https://docs.google.com/document/d/1eVKa7FHdeoJ1-9o8yZcOTAQbv0u0bblBlCCzVSIn69g/edit#heading=h.ystjyrkbtoq5 122 // for other TODOs. 123 124 public static final String BOOTSTRAP_THREAD_POOL_SIZE_CONF_KEY = 125 "hbase.assignment.bootstrap.thread.pool.size"; 126 127 public static final String ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY = 128 "hbase.assignment.dispatch.wait.msec"; 129 private static final int DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC = 150; 130 131 public static final String ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY = 132 "hbase.assignment.dispatch.wait.queue.max.size"; 133 private static final int DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX = 100; 134 135 public static final String RIT_CHORE_INTERVAL_MSEC_CONF_KEY = 136 "hbase.assignment.rit.chore.interval.msec"; 137 private static final int DEFAULT_RIT_CHORE_INTERVAL_MSEC = 60 * 1000; 138 139 public static final String DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY = 140 "hbase.assignment.dead.region.metric.chore.interval.msec"; 141 private static final int DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC = 120 * 1000; 142 143 public static final String ASSIGN_MAX_ATTEMPTS = "hbase.assignment.maximum.attempts"; 144 private static final int DEFAULT_ASSIGN_MAX_ATTEMPTS = Integer.MAX_VALUE; 145 146 public static final String ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 147 "hbase.assignment.retry.immediately.maximum.attempts"; 148 private static final int DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS = 3; 149 150 /** Region in Transition metrics threshold time */ 151 public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = 152 "hbase.metrics.rit.stuck.warning.threshold"; 153 private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000; 154 public static final String UNEXPECTED_STATE_REGION = "Unexpected state for "; 155 156 public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force"; 157 158 public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false; 159 160 /** The wait time in millis before checking again if the region's previous RS is back online */ 161 public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 162 "hbase.master.scp.retain.assignment.force.wait-interval"; 163 164 public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50; 165 166 /** 167 * The number of times to check if the region's previous RS is back online, before giving up and 168 * proceeding with assignment on a new RS 169 */ 170 public static final String FORCE_REGION_RETAINMENT_RETRIES = 171 "hbase.master.scp.retain.assignment.force.retries"; 172 173 public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600; 174 175 private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign"); 176 private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load"); 177 178 private final MetricsAssignmentManager metrics; 179 private final RegionInTransitionChore ritChore; 180 private final DeadServerMetricRegionChore deadMetricChore; 181 private final MasterServices master; 182 183 private final AtomicBoolean running = new AtomicBoolean(false); 184 private final RegionStates regionStates = new RegionStates(); 185 private final RegionStateStore regionStateStore; 186 187 /** 188 * When the operator uses this configuration option, any version between the current cluster 189 * version and the value of "hbase.min.version.move.system.tables" does not trigger any 190 * auto-region movement. Auto-region movement here refers to auto-migration of system table 191 * regions to newer server versions. It is assumed that the configured range of versions does not 192 * require special handling of moving system table regions to higher versioned RegionServer. This 193 * auto-migration is done by {@link #checkIfShouldMoveSystemRegionAsync()}. Example: Let's assume 194 * the cluster is on version 1.4.0 and we have set "hbase.min.version.move.system.tables" as 195 * "2.0.0". Now if we upgrade one RegionServer on 1.4.0 cluster to 1.6.0 (< 2.0.0), then 196 * AssignmentManager will not move hbase:meta, hbase:namespace and other system table regions to 197 * newly brought up RegionServer 1.6.0 as part of auto-migration. However, if we upgrade one 198 * RegionServer on 1.4.0 cluster to 2.2.0 (> 2.0.0), then AssignmentManager will move all system 199 * table regions to newly brought up RegionServer 2.2.0 as part of auto-migration done by 200 * {@link #checkIfShouldMoveSystemRegionAsync()}. "hbase.min.version.move.system.tables" is 201 * introduced as part of HBASE-22923. 202 */ 203 private final String minVersionToMoveSysTables; 204 205 private static final String MIN_VERSION_MOVE_SYS_TABLES_CONFIG = 206 "hbase.min.version.move.system.tables"; 207 private static final String DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG = ""; 208 209 private final Map<ServerName, Set<byte[]>> rsReports = new HashMap<>(); 210 211 private final boolean shouldAssignRegionsWithFavoredNodes; 212 private final int assignDispatchWaitQueueMaxSize; 213 private final int assignDispatchWaitMillis; 214 private final int assignMaxAttempts; 215 private final int assignRetryImmediatelyMaxAttempts; 216 217 private final MasterRegion masterRegion; 218 219 private final Object checkIfShouldMoveSystemRegionLock = new Object(); 220 221 private Thread assignThread; 222 223 private final boolean forceRegionRetainment; 224 225 private final long forceRegionRetainmentWaitInterval; 226 227 private final int forceRegionRetainmentRetries; 228 229 public AssignmentManager(MasterServices master, MasterRegion masterRegion) { 230 this(master, masterRegion, new RegionStateStore(master, masterRegion)); 231 } 232 233 AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) { 234 this.master = master; 235 this.regionStateStore = stateStore; 236 this.metrics = new MetricsAssignmentManager(); 237 this.masterRegion = masterRegion; 238 239 final Configuration conf = master.getConfiguration(); 240 241 // Only read favored nodes if using the favored nodes load balancer. 242 this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class 243 .isAssignableFrom(conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class)); 244 245 this.assignDispatchWaitMillis = 246 conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC); 247 this.assignDispatchWaitQueueMaxSize = 248 conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY, DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX); 249 250 this.assignMaxAttempts = 251 Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS, DEFAULT_ASSIGN_MAX_ATTEMPTS)); 252 this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS, 253 DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS); 254 255 int ritChoreInterval = 256 conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY, DEFAULT_RIT_CHORE_INTERVAL_MSEC); 257 this.ritChore = new RegionInTransitionChore(ritChoreInterval); 258 259 int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY, 260 DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC); 261 if (deadRegionChoreInterval > 0) { 262 this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval); 263 } else { 264 this.deadMetricChore = null; 265 } 266 minVersionToMoveSysTables = 267 conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG); 268 269 forceRegionRetainment = 270 conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT); 271 forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 272 DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL); 273 forceRegionRetainmentRetries = 274 conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES); 275 } 276 277 private void mirrorMetaLocations() throws IOException, KeeperException { 278 // For compatibility, mirror the meta region state to zookeeper 279 // And we still need to use zookeeper to publish the meta region locations to region 280 // server, so they can serve as ClientMetaService 281 ZKWatcher zk = master.getZooKeeper(); 282 if (zk == null || !zk.getRecoverableZooKeeper().getState().isAlive()) { 283 // this is possible in tests, we do not provide a zk watcher or the zk watcher has been closed 284 return; 285 } 286 Collection<RegionStateNode> metaStates = regionStates.getRegionStateNodes(); 287 for (RegionStateNode metaState : metaStates) { 288 MetaTableLocator.setMetaLocation(zk, metaState.getRegionLocation(), 289 metaState.getRegionInfo().getReplicaId(), metaState.getState()); 290 } 291 int replicaCount = metaStates.size(); 292 // remove extra mirror locations 293 for (String znode : zk.getMetaReplicaNodes()) { 294 int replicaId = zk.getZNodePaths().getMetaReplicaIdFromZNode(znode); 295 if (replicaId >= replicaCount) { 296 MetaTableLocator.deleteMetaLocation(zk, replicaId); 297 } 298 } 299 } 300 301 public void start() throws IOException, KeeperException { 302 if (!running.compareAndSet(false, true)) { 303 return; 304 } 305 306 LOG.trace("Starting assignment manager"); 307 308 // Start the Assignment Thread 309 startAssignmentThread(); 310 // load meta region states. 311 // here we are still in the early steps of active master startup. There is only one thread(us) 312 // can access AssignmentManager and create region node, so here we do not need to lock the 313 // region node. 314 try (ResultScanner scanner = 315 masterRegion.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY))) { 316 for (;;) { 317 Result result = scanner.next(); 318 if (result == null) { 319 break; 320 } 321 RegionStateStore 322 .visitMetaEntry((r, regionInfo, state, regionLocation, lastHost, openSeqNum) -> { 323 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 324 regionNode.setState(state); 325 regionNode.setLastHost(lastHost); 326 regionNode.setRegionLocation(regionLocation); 327 regionNode.setOpenSeqNum(openSeqNum); 328 if (regionNode.getProcedure() != null) { 329 regionNode.getProcedure().stateLoaded(this, regionNode); 330 } 331 if (regionLocation != null) { 332 // TODO: this could lead to some orphan server state nodes, as it is possible that the 333 // region server is already dead and its SCP has already finished but we have 334 // persisted an opening state on this region server. Finally the TRSP will assign the 335 // region to another region server, so it will not cause critical problems, just waste 336 // some memory as no one will try to cleanup these orphan server state nodes. 337 regionStates.createServer(regionLocation); 338 regionStates.addRegionToServer(regionNode); 339 } 340 if (RegionReplicaUtil.isDefaultReplica(regionInfo.getReplicaId())) { 341 setMetaAssigned(regionInfo, state == State.OPEN); 342 } 343 LOG.debug("Loaded hbase:meta {}", regionNode); 344 }, result); 345 } 346 } 347 mirrorMetaLocations(); 348 } 349 350 /** 351 * Create RegionStateNode based on the TRSP list, and attach the TRSP to the RegionStateNode. 352 * <p> 353 * This is used to restore the RIT region list, so we do not need to restore it in the loadingMeta 354 * method below. And it is also very important as now before submitting a TRSP, we need to attach 355 * it to the RegionStateNode, which acts like a guard, so we need to restore this information at 356 * the very beginning, before we start processing any procedures. 357 */ 358 public void setupRIT(List<TransitRegionStateProcedure> procs) { 359 procs.forEach(proc -> { 360 RegionInfo regionInfo = proc.getRegion(); 361 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 362 TransitRegionStateProcedure existingProc = regionNode.getProcedure(); 363 if (existingProc != null) { 364 // This is possible, as we will detach the procedure from the RSN before we 365 // actually finish the procedure. This is because that, we will detach the TRSP from the RSN 366 // during execution, at that time, the procedure has not been marked as done in the pv2 367 // framework yet, so it is possible that we schedule a new TRSP immediately and when 368 // arriving here, we will find out that there are multiple TRSPs for the region. But we can 369 // make sure that, only the last one can take the charge, the previous ones should have all 370 // been finished already. So here we will compare the proc id, the greater one will win. 371 if (existingProc.getProcId() < proc.getProcId()) { 372 // the new one wins, unset and set it to the new one below 373 regionNode.unsetProcedure(existingProc); 374 } else { 375 // the old one wins, skip 376 return; 377 } 378 } 379 LOG.info("Attach {} to {} to restore RIT", proc, regionNode); 380 regionNode.setProcedure(proc); 381 }); 382 } 383 384 public void stop() { 385 if (!running.compareAndSet(true, false)) { 386 return; 387 } 388 389 LOG.info("Stopping assignment manager"); 390 391 // The AM is started before the procedure executor, 392 // but the actual work will be loaded/submitted only once we have the executor 393 final boolean hasProcExecutor = master.getMasterProcedureExecutor() != null; 394 395 // Remove the RIT chore 396 if (hasProcExecutor) { 397 master.getMasterProcedureExecutor().removeChore(this.ritChore); 398 if (this.deadMetricChore != null) { 399 master.getMasterProcedureExecutor().removeChore(this.deadMetricChore); 400 } 401 } 402 403 // Stop the Assignment Thread 404 stopAssignmentThread(); 405 406 // Stop the RegionStateStore 407 regionStates.clear(); 408 409 // Update meta events (for testing) 410 if (hasProcExecutor) { 411 metaLoadEvent.suspend(); 412 for (RegionInfo hri : getMetaRegionSet()) { 413 setMetaAssigned(hri, false); 414 } 415 } 416 } 417 418 public boolean isRunning() { 419 return running.get(); 420 } 421 422 public Configuration getConfiguration() { 423 return master.getConfiguration(); 424 } 425 426 public MetricsAssignmentManager getAssignmentManagerMetrics() { 427 return metrics; 428 } 429 430 private LoadBalancer getBalancer() { 431 return master.getLoadBalancer(); 432 } 433 434 private MasterProcedureEnv getProcedureEnvironment() { 435 return master.getMasterProcedureExecutor().getEnvironment(); 436 } 437 438 private MasterProcedureScheduler getProcedureScheduler() { 439 return getProcedureEnvironment().getProcedureScheduler(); 440 } 441 442 int getAssignMaxAttempts() { 443 return assignMaxAttempts; 444 } 445 446 public boolean isForceRegionRetainment() { 447 return forceRegionRetainment; 448 } 449 450 public long getForceRegionRetainmentWaitInterval() { 451 return forceRegionRetainmentWaitInterval; 452 } 453 454 public int getForceRegionRetainmentRetries() { 455 return forceRegionRetainmentRetries; 456 } 457 458 int getAssignRetryImmediatelyMaxAttempts() { 459 return assignRetryImmediatelyMaxAttempts; 460 } 461 462 public RegionStates getRegionStates() { 463 return regionStates; 464 } 465 466 /** 467 * Returns the regions hosted by the specified server. 468 * <p/> 469 * Notice that, for SCP, after we submit the SCP, no one can change the region list for the 470 * ServerStateNode so we do not need any locks here. And for other usage, this can only give you a 471 * snapshot of the current region list for this server, which means, right after you get the 472 * region list, new regions may be moved to this server or some regions may be moved out from this 473 * server, so you should not use it critically if you need strong consistency. 474 */ 475 public List<RegionInfo> getRegionsOnServer(ServerName serverName) { 476 ServerStateNode serverInfo = regionStates.getServerNode(serverName); 477 if (serverInfo == null) { 478 return Collections.emptyList(); 479 } 480 return serverInfo.getRegionInfoList(); 481 } 482 483 private RegionInfo getRegionInfo(RegionStateNode rsn) { 484 if (rsn.isSplit() && !rsn.getRegionInfo().isSplit()) { 485 // see the comments in markRegionAsSplit on why we need to do this converting. 486 return RegionInfoBuilder.newBuilder(rsn.getRegionInfo()).setSplit(true).setOffline(true) 487 .build(); 488 } else { 489 return rsn.getRegionInfo(); 490 } 491 } 492 493 private Stream<RegionStateNode> getRegionStateNodes(TableName tableName, 494 boolean excludeOfflinedSplitParents) { 495 Stream<RegionStateNode> stream = regionStates.getTableRegionStateNodes(tableName).stream(); 496 if (excludeOfflinedSplitParents) { 497 return stream.filter(rsn -> !rsn.isSplit()); 498 } else { 499 return stream; 500 } 501 } 502 503 public List<RegionInfo> getTableRegions(TableName tableName, 504 boolean excludeOfflinedSplitParents) { 505 return getRegionStateNodes(tableName, excludeOfflinedSplitParents).map(this::getRegionInfo) 506 .collect(Collectors.toList()); 507 } 508 509 public List<Pair<RegionInfo, ServerName>> getTableRegionsAndLocations(TableName tableName, 510 boolean excludeOfflinedSplitParents) { 511 return getRegionStateNodes(tableName, excludeOfflinedSplitParents) 512 .map(rsn -> Pair.newPair(getRegionInfo(rsn), rsn.getRegionLocation())) 513 .collect(Collectors.toList()); 514 } 515 516 public RegionStateStore getRegionStateStore() { 517 return regionStateStore; 518 } 519 520 public List<ServerName> getFavoredNodes(final RegionInfo regionInfo) { 521 return this.shouldAssignRegionsWithFavoredNodes 522 ? ((FavoredStochasticBalancer) getBalancer()).getFavoredNodes(regionInfo) 523 : ServerName.EMPTY_SERVER_LIST; 524 } 525 526 // ============================================================================================ 527 // Table State Manager helpers 528 // ============================================================================================ 529 private TableStateManager getTableStateManager() { 530 return master.getTableStateManager(); 531 } 532 533 private boolean isTableEnabled(final TableName tableName) { 534 return getTableStateManager().isTableState(tableName, TableState.State.ENABLED); 535 } 536 537 private boolean isTableDisabled(final TableName tableName) { 538 return getTableStateManager().isTableState(tableName, TableState.State.DISABLED, 539 TableState.State.DISABLING); 540 } 541 542 // ============================================================================================ 543 // META Helpers 544 // ============================================================================================ 545 private boolean isMetaRegion(final RegionInfo regionInfo) { 546 return regionInfo.isMetaRegion(); 547 } 548 549 public boolean isMetaRegion(final byte[] regionName) { 550 return getMetaRegionFromName(regionName) != null; 551 } 552 553 public RegionInfo getMetaRegionFromName(final byte[] regionName) { 554 for (RegionInfo hri : getMetaRegionSet()) { 555 if (Bytes.equals(hri.getRegionName(), regionName)) { 556 return hri; 557 } 558 } 559 return null; 560 } 561 562 public boolean isCarryingMeta(final ServerName serverName) { 563 // TODO: handle multiple meta 564 return isCarryingRegion(serverName, RegionInfoBuilder.FIRST_META_REGIONINFO); 565 } 566 567 private boolean isCarryingRegion(final ServerName serverName, final RegionInfo regionInfo) { 568 // TODO: check for state? 569 final RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 570 return (node != null && serverName.equals(node.getRegionLocation())); 571 } 572 573 private RegionInfo getMetaForRegion(final RegionInfo regionInfo) { 574 // if (regionInfo.isMetaRegion()) return regionInfo; 575 // TODO: handle multiple meta. if the region provided is not meta lookup 576 // which meta the region belongs to. 577 return RegionInfoBuilder.FIRST_META_REGIONINFO; 578 } 579 580 // TODO: handle multiple meta. 581 private static final Set<RegionInfo> META_REGION_SET = 582 Collections.singleton(RegionInfoBuilder.FIRST_META_REGIONINFO); 583 584 public Set<RegionInfo> getMetaRegionSet() { 585 return META_REGION_SET; 586 } 587 588 // ============================================================================================ 589 // META Event(s) helpers 590 // ============================================================================================ 591 /** 592 * Notice that, this only means the meta region is available on a RS, but the AM may still be 593 * loading the region states from meta, so usually you need to check {@link #isMetaLoaded()} first 594 * before checking this method, unless you can make sure that your piece of code can only be 595 * executed after AM builds the region states. 596 * @see #isMetaLoaded() 597 */ 598 public boolean isMetaAssigned() { 599 return metaAssignEvent.isReady(); 600 } 601 602 public boolean isMetaRegionInTransition() { 603 return !isMetaAssigned(); 604 } 605 606 /** 607 * Notice that this event does not mean the AM has already finished region state rebuilding. See 608 * the comment of {@link #isMetaAssigned()} for more details. 609 * @see #isMetaAssigned() 610 */ 611 public boolean waitMetaAssigned(Procedure<?> proc, RegionInfo regionInfo) { 612 return getMetaAssignEvent(getMetaForRegion(regionInfo)).suspendIfNotReady(proc); 613 } 614 615 private void setMetaAssigned(RegionInfo metaRegionInfo, boolean assigned) { 616 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 617 ProcedureEvent<?> metaAssignEvent = getMetaAssignEvent(metaRegionInfo); 618 if (assigned) { 619 metaAssignEvent.wake(getProcedureScheduler()); 620 } else { 621 metaAssignEvent.suspend(); 622 } 623 } 624 625 private ProcedureEvent<?> getMetaAssignEvent(RegionInfo metaRegionInfo) { 626 assert isMetaRegion(metaRegionInfo) : "unexpected non-meta region " + metaRegionInfo; 627 // TODO: handle multiple meta. 628 return metaAssignEvent; 629 } 630 631 /** 632 * Wait until AM finishes the meta loading, i.e, the region states rebuilding. 633 * @see #isMetaLoaded() 634 * @see #waitMetaAssigned(Procedure, RegionInfo) 635 */ 636 public boolean waitMetaLoaded(Procedure<?> proc) { 637 return metaLoadEvent.suspendIfNotReady(proc); 638 } 639 640 /** 641 * This method will be called in master initialization method after calling 642 * {@link #processOfflineRegions()}, as in processOfflineRegions we will generate assign 643 * procedures for offline regions, which may be conflict with creating table. 644 * <p/> 645 * This is a bit dirty, should be reconsidered after we decide whether to keep the 646 * {@link #processOfflineRegions()} method. 647 */ 648 public void wakeMetaLoadedEvent() { 649 metaLoadEvent.wake(getProcedureScheduler()); 650 assert isMetaLoaded() : "expected meta to be loaded"; 651 } 652 653 /** 654 * Return whether AM finishes the meta loading, i.e, the region states rebuilding. 655 * @see #isMetaAssigned() 656 * @see #waitMetaLoaded(Procedure) 657 */ 658 public boolean isMetaLoaded() { 659 return metaLoadEvent.isReady(); 660 } 661 662 /** 663 * Start a new thread to check if there are region servers whose versions are higher than others. 664 * If so, move all system table regions to RS with the highest version to keep compatibility. The 665 * reason is, RS in new version may not be able to access RS in old version when there are some 666 * incompatible changes. 667 * <p> 668 * This method is called when a new RegionServer is added to cluster only. 669 * </p> 670 */ 671 public void checkIfShouldMoveSystemRegionAsync() { 672 // TODO: Fix this thread. If a server is killed and a new one started, this thread thinks that 673 // it should 'move' the system tables from the old server to the new server but 674 // ServerCrashProcedure is on it; and it will take care of the assign without dataloss. 675 if (this.master.getServerManager().countOfRegionServers() <= 1) { 676 return; 677 } 678 // This thread used to run whenever there was a change in the cluster. The ZooKeeper 679 // childrenChanged notification came in before the nodeDeleted message and so this method 680 // cold run before a ServerCrashProcedure could run. That meant that this thread could see 681 // a Crashed Server before ServerCrashProcedure and it could find system regions on the 682 // crashed server and go move them before ServerCrashProcedure had a chance; could be 683 // dataloss too if WALs were not recovered. 684 new Thread(() -> { 685 try { 686 synchronized (checkIfShouldMoveSystemRegionLock) { 687 List<RegionPlan> plans = new ArrayList<>(); 688 // TODO: I don't think this code does a good job if all servers in cluster have same 689 // version. It looks like it will schedule unnecessary moves. 690 for (ServerName server : getExcludedServersForSystemTable()) { 691 if (master.getServerManager().isServerDead(server)) { 692 // TODO: See HBASE-18494 and HBASE-18495. Though getExcludedServersForSystemTable() 693 // considers only online servers, the server could be queued for dead server 694 // processing. As region assignments for crashed server is handled by 695 // ServerCrashProcedure, do NOT handle them here. The goal is to handle this through 696 // regular flow of LoadBalancer as a favored node and not to have this special 697 // handling. 698 continue; 699 } 700 List<RegionInfo> regionsShouldMove = getSystemTables(server); 701 if (!regionsShouldMove.isEmpty()) { 702 for (RegionInfo regionInfo : regionsShouldMove) { 703 // null value for dest forces destination server to be selected by balancer 704 RegionPlan plan = new RegionPlan(regionInfo, server, null); 705 if (regionInfo.isMetaRegion()) { 706 // Must move meta region first. 707 LOG.info("Async MOVE of {} to newer Server={}", regionInfo.getEncodedName(), 708 server); 709 moveAsync(plan); 710 } else { 711 plans.add(plan); 712 } 713 } 714 } 715 for (RegionPlan plan : plans) { 716 LOG.info("Async MOVE of {} to newer Server={}", plan.getRegionInfo().getEncodedName(), 717 server); 718 moveAsync(plan); 719 } 720 } 721 } 722 } catch (Throwable t) { 723 LOG.error(t.toString(), t); 724 } 725 }).start(); 726 } 727 728 private List<RegionInfo> getSystemTables(ServerName serverName) { 729 ServerStateNode serverNode = regionStates.getServerNode(serverName); 730 if (serverNode == null) { 731 return Collections.emptyList(); 732 } 733 return serverNode.getSystemRegionInfoList(); 734 } 735 736 private void preTransitCheck(RegionStateNode regionNode, RegionState.State[] expectedStates) 737 throws HBaseIOException { 738 if (regionNode.getProcedure() != null) { 739 throw new HBaseIOException( 740 regionNode + " is currently in transition; pid=" + regionNode.getProcedure().getProcId()); 741 } 742 if (!regionNode.isInState(expectedStates)) { 743 throw new DoNotRetryRegionException(UNEXPECTED_STATE_REGION + regionNode); 744 } 745 if (isTableDisabled(regionNode.getTable())) { 746 throw new DoNotRetryIOException(regionNode.getTable() + " is disabled for " + regionNode); 747 } 748 } 749 750 /** 751 * Create an assign TransitRegionStateProcedure. Makes sure of RegionState. Throws exception if 752 * not appropriate UNLESS override is set. Used by hbck2 but also by straightline 753 * {@link #assign(RegionInfo, ServerName)} and {@link #assignAsync(RegionInfo, ServerName)}. 754 * @see #createAssignProcedure(RegionStateNode, ServerName) for a version that does NO checking 755 * used when only when no checking needed. 756 * @param override If false, check RegionState is appropriate for assign; if not throw exception. 757 */ 758 private TransitRegionStateProcedure createAssignProcedure(RegionInfo regionInfo, ServerName sn, 759 boolean override) throws IOException { 760 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 761 regionNode.lock(); 762 try { 763 if (override) { 764 if (regionNode.getProcedure() != null) { 765 regionNode.unsetProcedure(regionNode.getProcedure()); 766 } 767 } else { 768 preTransitCheck(regionNode, STATES_EXPECTED_ON_ASSIGN); 769 } 770 assert regionNode.getProcedure() == null; 771 return regionNode.setProcedure( 772 TransitRegionStateProcedure.assign(getProcedureEnvironment(), regionInfo, sn)); 773 } finally { 774 regionNode.unlock(); 775 } 776 } 777 778 /** 779 * Create an assign TransitRegionStateProcedure. Does NO checking of RegionState. Presumes 780 * appriopriate state ripe for assign. 781 * @see #createAssignProcedure(RegionInfo, ServerName, boolean) 782 */ 783 private TransitRegionStateProcedure createAssignProcedure(RegionStateNode regionNode, 784 ServerName targetServer) { 785 regionNode.lock(); 786 try { 787 return regionNode.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 788 regionNode.getRegionInfo(), targetServer)); 789 } finally { 790 regionNode.unlock(); 791 } 792 } 793 794 public long assign(RegionInfo regionInfo, ServerName sn) throws IOException { 795 TransitRegionStateProcedure proc = createAssignProcedure(regionInfo, sn, false); 796 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 797 return proc.getProcId(); 798 } 799 800 public long assign(RegionInfo regionInfo) throws IOException { 801 return assign(regionInfo, null); 802 } 803 804 /** 805 * Submits a procedure that assigns a region to a target server without waiting for it to finish 806 * @param regionInfo the region we would like to assign 807 * @param sn target server name 808 */ 809 public Future<byte[]> assignAsync(RegionInfo regionInfo, ServerName sn) throws IOException { 810 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), 811 createAssignProcedure(regionInfo, sn, false)); 812 } 813 814 /** 815 * Submits a procedure that assigns a region without waiting for it to finish 816 * @param regionInfo the region we would like to assign 817 */ 818 public Future<byte[]> assignAsync(RegionInfo regionInfo) throws IOException { 819 return assignAsync(regionInfo, null); 820 } 821 822 public long unassign(RegionInfo regionInfo) throws IOException { 823 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 824 if (regionNode == null) { 825 throw new UnknownRegionException("No RegionState found for " + regionInfo.getEncodedName()); 826 } 827 TransitRegionStateProcedure proc; 828 regionNode.lock(); 829 try { 830 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 831 proc = TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionInfo); 832 regionNode.setProcedure(proc); 833 } finally { 834 regionNode.unlock(); 835 } 836 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 837 return proc.getProcId(); 838 } 839 840 public TransitRegionStateProcedure createMoveRegionProcedure(RegionInfo regionInfo, 841 ServerName targetServer) throws HBaseIOException { 842 RegionStateNode regionNode = this.regionStates.getRegionStateNode(regionInfo); 843 if (regionNode == null) { 844 throw new UnknownRegionException( 845 "No RegionStateNode found for " + regionInfo.getEncodedName() + "(Closed/Deleted?)"); 846 } 847 TransitRegionStateProcedure proc; 848 regionNode.lock(); 849 try { 850 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 851 regionNode.checkOnline(); 852 proc = TransitRegionStateProcedure.move(getProcedureEnvironment(), regionInfo, targetServer); 853 regionNode.setProcedure(proc); 854 } finally { 855 regionNode.unlock(); 856 } 857 return proc; 858 } 859 860 public void move(RegionInfo regionInfo) throws IOException { 861 TransitRegionStateProcedure proc = createMoveRegionProcedure(regionInfo, null); 862 ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc); 863 } 864 865 public Future<byte[]> moveAsync(RegionPlan regionPlan) throws HBaseIOException { 866 TransitRegionStateProcedure proc = 867 createMoveRegionProcedure(regionPlan.getRegionInfo(), regionPlan.getDestination()); 868 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 869 } 870 871 public Future<byte[]> balance(RegionPlan regionPlan) throws HBaseIOException { 872 ServerName current = 873 this.getRegionStates().getRegionAssignments().get(regionPlan.getRegionInfo()); 874 if (current == null || !current.equals(regionPlan.getSource())) { 875 LOG.debug("Skip region plan {}, source server not match, current region location is {}", 876 regionPlan, current == null ? "(null)" : current); 877 return null; 878 } 879 return moveAsync(regionPlan); 880 } 881 882 // ============================================================================================ 883 // RegionTransition procedures helpers 884 // ============================================================================================ 885 886 /** 887 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 888 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 889 * to populate the assigns with targets chosen using round-robin (default balancer 890 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 891 * AssignProcedure will ask the balancer for a new target, and so on. 892 */ 893 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris, 894 List<ServerName> serversToExclude) { 895 if (hris.isEmpty()) { 896 return new TransitRegionStateProcedure[0]; 897 } 898 899 if ( 900 serversToExclude != null && this.master.getServerManager().getOnlineServersList().size() == 1 901 ) { 902 LOG.debug("Only one region server found and hence going ahead with the assignment"); 903 serversToExclude = null; 904 } 905 try { 906 // Ask the balancer to assign our regions. Pass the regions en masse. The balancer can do 907 // a better job if it has all the assignments in the one lump. 908 Map<ServerName, List<RegionInfo>> assignments = getBalancer().roundRobinAssignment(hris, 909 this.master.getServerManager().createDestinationServersList(serversToExclude)); 910 // Return mid-method! 911 return createAssignProcedures(assignments); 912 } catch (HBaseIOException hioe) { 913 LOG.warn("Failed roundRobinAssignment", hioe); 914 } 915 // If an error above, fall-through to this simpler assign. Last resort. 916 return createAssignProcedures(hris); 917 } 918 919 /** 920 * Create round-robin assigns. Use on table creation to distribute out regions across cluster. 921 * @return AssignProcedures made out of the passed in <code>hris</code> and a call to the balancer 922 * to populate the assigns with targets chosen using round-robin (default balancer 923 * scheme). If at assign-time, the target chosen is no longer up, thats fine, the 924 * AssignProcedure will ask the balancer for a new target, and so on. 925 */ 926 public TransitRegionStateProcedure[] createRoundRobinAssignProcedures(List<RegionInfo> hris) { 927 return createRoundRobinAssignProcedures(hris, null); 928 } 929 930 static int compare(TransitRegionStateProcedure left, TransitRegionStateProcedure right) { 931 if (left.getRegion().isMetaRegion()) { 932 if (right.getRegion().isMetaRegion()) { 933 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 934 } 935 return -1; 936 } else if (right.getRegion().isMetaRegion()) { 937 return +1; 938 } 939 if (left.getRegion().getTable().isSystemTable()) { 940 if (right.getRegion().getTable().isSystemTable()) { 941 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 942 } 943 return -1; 944 } else if (right.getRegion().getTable().isSystemTable()) { 945 return +1; 946 } 947 return RegionInfo.COMPARATOR.compare(left.getRegion(), right.getRegion()); 948 } 949 950 /** 951 * Create one TransitRegionStateProcedure to assign a region w/o specifying a target server. This 952 * method is called from HBCK2. 953 * @return an assign or null 954 */ 955 public TransitRegionStateProcedure createOneAssignProcedure(RegionInfo ri, boolean override) { 956 TransitRegionStateProcedure trsp = null; 957 try { 958 trsp = createAssignProcedure(ri, null, override); 959 } catch (IOException ioe) { 960 LOG.info( 961 "Failed {} assign, override={}" 962 + (override ? "" : "; set override to by-pass state checks."), 963 ri.getEncodedName(), override, ioe); 964 } 965 return trsp; 966 } 967 968 /** 969 * Create one TransitRegionStateProcedure to unassign a region. This method is called from HBCK2. 970 * @return an unassign or null 971 */ 972 public TransitRegionStateProcedure createOneUnassignProcedure(RegionInfo ri, boolean override) { 973 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(ri); 974 TransitRegionStateProcedure trsp = null; 975 regionNode.lock(); 976 try { 977 if (override) { 978 if (regionNode.getProcedure() != null) { 979 regionNode.unsetProcedure(regionNode.getProcedure()); 980 } 981 } else { 982 // This is where we could throw an exception; i.e. override is false. 983 preTransitCheck(regionNode, STATES_EXPECTED_ON_UNASSIGN_OR_MOVE); 984 } 985 assert regionNode.getProcedure() == null; 986 trsp = 987 TransitRegionStateProcedure.unassign(getProcedureEnvironment(), regionNode.getRegionInfo()); 988 regionNode.setProcedure(trsp); 989 } catch (IOException ioe) { 990 // 'override' must be false here. 991 LOG.info("Failed {} unassign, override=false; set override to by-pass state checks.", 992 ri.getEncodedName(), ioe); 993 } finally { 994 regionNode.unlock(); 995 } 996 return trsp; 997 } 998 999 /** 1000 * Create an array of TransitRegionStateProcedure w/o specifying a target server. Used as fallback 1001 * of caller is unable to do {@link #createAssignProcedures(Map)}. 1002 * <p/> 1003 * If no target server, at assign time, we will try to use the former location of the region if 1004 * one exists. This is how we 'retain' the old location across a server restart. 1005 * <p/> 1006 * Should only be called when you can make sure that no one can touch these regions other than 1007 * you. For example, when you are creating or enabling table. Presumes all Regions are in 1008 * appropriate state ripe for assign; no checking of Region state is done in here. 1009 * @see #createAssignProcedures(Map) 1010 */ 1011 public TransitRegionStateProcedure[] createAssignProcedures(List<RegionInfo> hris) { 1012 return hris.stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1013 .map(regionNode -> createAssignProcedure(regionNode, null)).sorted(AssignmentManager::compare) 1014 .toArray(TransitRegionStateProcedure[]::new); 1015 } 1016 1017 /** 1018 * Tied to {@link #createAssignProcedures(List)} in that it is called if caller is unable to run 1019 * this method. Presumes all Regions are in appropriate state ripe for assign; no checking of 1020 * Region state is done in here. 1021 * @param assignments Map of assignments from which we produce an array of AssignProcedures. 1022 * @return Assignments made from the passed in <code>assignments</code> 1023 * @see #createAssignProcedures(List) 1024 */ 1025 private TransitRegionStateProcedure[] 1026 createAssignProcedures(Map<ServerName, List<RegionInfo>> assignments) { 1027 return assignments.entrySet().stream() 1028 .flatMap(e -> e.getValue().stream().map(hri -> regionStates.getOrCreateRegionStateNode(hri)) 1029 .map(regionNode -> createAssignProcedure(regionNode, e.getKey()))) 1030 .sorted(AssignmentManager::compare).toArray(TransitRegionStateProcedure[]::new); 1031 } 1032 1033 // for creating unassign TRSP when disabling a table or closing excess region replicas 1034 private TransitRegionStateProcedure forceCreateUnssignProcedure(RegionStateNode regionNode) { 1035 regionNode.lock(); 1036 try { 1037 if (regionNode.isInState(State.OFFLINE, State.CLOSED, State.SPLIT)) { 1038 return null; 1039 } 1040 // in general, a split parent should be in CLOSED or SPLIT state, but anyway, let's check it 1041 // here for safety 1042 if (regionNode.getRegionInfo().isSplit()) { 1043 LOG.warn("{} is a split parent but not in CLOSED or SPLIT state", regionNode); 1044 return null; 1045 } 1046 // As in DisableTableProcedure or ModifyTableProcedure, we will hold the xlock for table, so 1047 // we can make sure that this procedure has not been executed yet, as TRSP will hold the 1048 // shared lock for table all the time. So here we will unset it and when it is actually 1049 // executed, it will find that the attach procedure is not itself and quit immediately. 1050 if (regionNode.getProcedure() != null) { 1051 regionNode.unsetProcedure(regionNode.getProcedure()); 1052 } 1053 return regionNode.setProcedure(TransitRegionStateProcedure.unassign(getProcedureEnvironment(), 1054 regionNode.getRegionInfo())); 1055 } finally { 1056 regionNode.unlock(); 1057 } 1058 } 1059 1060 /** 1061 * Called by DisableTableProcedure to unassign all the regions for a table. 1062 */ 1063 public TransitRegionStateProcedure[] createUnassignProceduresForDisabling(TableName tableName) { 1064 return regionStates.getTableRegionStateNodes(tableName).stream() 1065 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1066 .toArray(TransitRegionStateProcedure[]::new); 1067 } 1068 1069 /** 1070 * Called by ModifyTableProcedures to unassign all the excess region replicas for a table. 1071 */ 1072 public TransitRegionStateProcedure[] createUnassignProceduresForClosingExcessRegionReplicas( 1073 TableName tableName, int newReplicaCount) { 1074 return regionStates.getTableRegionStateNodes(tableName).stream() 1075 .filter(regionNode -> regionNode.getRegionInfo().getReplicaId() >= newReplicaCount) 1076 .map(this::forceCreateUnssignProcedure).filter(p -> p != null) 1077 .toArray(TransitRegionStateProcedure[]::new); 1078 } 1079 1080 public SplitTableRegionProcedure createSplitProcedure(final RegionInfo regionToSplit, 1081 final byte[] splitKey) throws IOException { 1082 return new SplitTableRegionProcedure(getProcedureEnvironment(), regionToSplit, splitKey); 1083 } 1084 1085 public TruncateRegionProcedure createTruncateRegionProcedure(final RegionInfo regionToTruncate) 1086 throws IOException { 1087 return new TruncateRegionProcedure(getProcedureEnvironment(), regionToTruncate); 1088 } 1089 1090 public MergeTableRegionsProcedure createMergeProcedure(RegionInfo... ris) throws IOException { 1091 return new MergeTableRegionsProcedure(getProcedureEnvironment(), ris, false); 1092 } 1093 1094 /** 1095 * Delete the region states. This is called by "DeleteTable" 1096 */ 1097 public void deleteTable(final TableName tableName) throws IOException { 1098 final ArrayList<RegionInfo> regions = regionStates.getTableRegionsInfo(tableName); 1099 regionStateStore.deleteRegions(regions); 1100 for (int i = 0; i < regions.size(); ++i) { 1101 final RegionInfo regionInfo = regions.get(i); 1102 regionStates.deleteRegion(regionInfo); 1103 } 1104 } 1105 1106 // ============================================================================================ 1107 // RS Region Transition Report helpers 1108 // ============================================================================================ 1109 private void reportRegionStateTransition(ReportRegionStateTransitionResponse.Builder builder, 1110 ServerStateNode serverNode, List<RegionStateTransition> transitionList) throws IOException { 1111 for (RegionStateTransition transition : transitionList) { 1112 switch (transition.getTransitionCode()) { 1113 case OPENED: 1114 case FAILED_OPEN: 1115 case CLOSED: 1116 assert transition.getRegionInfoCount() == 1 : transition; 1117 final RegionInfo hri = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1118 long procId = 1119 transition.getProcIdCount() > 0 ? transition.getProcId(0) : Procedure.NO_PROC_ID; 1120 updateRegionTransition(serverNode, transition.getTransitionCode(), hri, 1121 transition.hasOpenSeqNum() ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM, procId); 1122 break; 1123 case READY_TO_SPLIT: 1124 case SPLIT: 1125 case SPLIT_REVERTED: 1126 assert transition.getRegionInfoCount() == 3 : transition; 1127 final RegionInfo parent = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1128 final RegionInfo splitA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1129 final RegionInfo splitB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1130 updateRegionSplitTransition(serverNode, transition.getTransitionCode(), parent, splitA, 1131 splitB); 1132 break; 1133 case READY_TO_MERGE: 1134 case MERGED: 1135 case MERGE_REVERTED: 1136 assert transition.getRegionInfoCount() == 3 : transition; 1137 final RegionInfo merged = ProtobufUtil.toRegionInfo(transition.getRegionInfo(0)); 1138 final RegionInfo mergeA = ProtobufUtil.toRegionInfo(transition.getRegionInfo(1)); 1139 final RegionInfo mergeB = ProtobufUtil.toRegionInfo(transition.getRegionInfo(2)); 1140 updateRegionMergeTransition(serverNode, transition.getTransitionCode(), merged, mergeA, 1141 mergeB); 1142 break; 1143 } 1144 } 1145 } 1146 1147 public ReportRegionStateTransitionResponse reportRegionStateTransition( 1148 final ReportRegionStateTransitionRequest req) throws PleaseHoldException { 1149 ReportRegionStateTransitionResponse.Builder builder = 1150 ReportRegionStateTransitionResponse.newBuilder(); 1151 ServerName serverName = ProtobufUtil.toServerName(req.getServer()); 1152 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1153 if (serverNode == null) { 1154 LOG.warn("No server node for {}", serverName); 1155 builder.setErrorMessage("No server node for " + serverName); 1156 return builder.build(); 1157 } 1158 // here we have to acquire a read lock instead of a simple exclusive lock. This is because that 1159 // we should not block other reportRegionStateTransition call from the same region server. This 1160 // is not only about performance, but also to prevent dead lock. Think of the meta region is 1161 // also on the same region server and you hold the lock which blocks the 1162 // reportRegionStateTransition for meta, and since meta is not online, you will block inside the 1163 // lock protection to wait for meta online... 1164 serverNode.readLock().lock(); 1165 try { 1166 // we only accept reportRegionStateTransition if the region server is online, see the comment 1167 // above in submitServerCrash method and HBASE-21508 for more details. 1168 if (serverNode.isInState(ServerState.ONLINE)) { 1169 try { 1170 reportRegionStateTransition(builder, serverNode, req.getTransitionList()); 1171 } catch (PleaseHoldException e) { 1172 LOG.trace("Failed transition ", e); 1173 throw e; 1174 } catch (UnsupportedOperationException | IOException e) { 1175 // TODO: at the moment we have a single error message and the RS will abort 1176 // if the master says that one of the region transitions failed. 1177 LOG.warn("Failed transition", e); 1178 builder.setErrorMessage("Failed transition " + e.getMessage()); 1179 } 1180 } else { 1181 LOG.warn("The region server {} is already dead, skip reportRegionStateTransition call", 1182 serverName); 1183 builder.setErrorMessage("You are dead"); 1184 } 1185 } finally { 1186 serverNode.readLock().unlock(); 1187 } 1188 1189 return builder.build(); 1190 } 1191 1192 private void updateRegionTransition(ServerStateNode serverNode, TransitionCode state, 1193 RegionInfo regionInfo, long seqId, long procId) throws IOException { 1194 checkMetaLoaded(regionInfo); 1195 1196 RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1197 if (regionNode == null) { 1198 // the table/region is gone. maybe a delete, split, merge 1199 throw new UnexpectedStateException(String.format( 1200 "Server %s was trying to transition region %s to %s. but Region is not known.", 1201 serverNode.getServerName(), regionInfo, state)); 1202 } 1203 LOG.trace("Update region transition serverName={} region={} regionState={}", 1204 serverNode.getServerName(), regionNode, state); 1205 1206 regionNode.lock(); 1207 try { 1208 if (!reportTransition(regionNode, serverNode, state, seqId, procId)) { 1209 // Don't log WARN if shutting down cluster; during shutdown. Avoid the below messages: 1210 // 2018-08-13 10:45:10,551 WARN ...AssignmentManager: No matching procedure found for 1211 // rit=OPEN, location=ve0538.halxg.cloudera.com,16020,1533493000958, 1212 // table=IntegrationTestBigLinkedList, region=65ab289e2fc1530df65f6c3d7cde7aa5 transition 1213 // to CLOSED 1214 // These happen because on cluster shutdown, we currently let the RegionServers close 1215 // regions. This is the only time that region close is not run by the Master (so cluster 1216 // goes down fast). Consider changing it so Master runs all shutdowns. 1217 if ( 1218 this.master.getServerManager().isClusterShutdown() && state.equals(TransitionCode.CLOSED) 1219 ) { 1220 LOG.info("RegionServer {} {}", state, regionNode.getRegionInfo().getEncodedName()); 1221 } else { 1222 LOG.warn("No matching procedure found for {} transition on {} to {}", 1223 serverNode.getServerName(), regionNode, state); 1224 } 1225 } 1226 } finally { 1227 regionNode.unlock(); 1228 } 1229 } 1230 1231 private boolean reportTransition(RegionStateNode regionNode, ServerStateNode serverNode, 1232 TransitionCode state, long seqId, long procId) throws IOException { 1233 ServerName serverName = serverNode.getServerName(); 1234 TransitRegionStateProcedure proc = regionNode.getProcedure(); 1235 if (proc == null) { 1236 return false; 1237 } 1238 proc.reportTransition(master.getMasterProcedureExecutor().getEnvironment(), regionNode, 1239 serverName, state, seqId, procId); 1240 return true; 1241 } 1242 1243 private void updateRegionSplitTransition(final ServerStateNode serverNode, 1244 final TransitionCode state, final RegionInfo parent, final RegionInfo hriA, 1245 final RegionInfo hriB) throws IOException { 1246 checkMetaLoaded(parent); 1247 1248 if (state != TransitionCode.READY_TO_SPLIT) { 1249 throw new UnexpectedStateException( 1250 "unsupported split regionState=" + state + " for parent region " + parent 1251 + " maybe an old RS (< 2.0) had the operation in progress"); 1252 } 1253 1254 // sanity check on the request 1255 if (!Bytes.equals(hriA.getEndKey(), hriB.getStartKey())) { 1256 throw new UnsupportedOperationException("unsupported split request with bad keys: parent=" 1257 + parent + " hriA=" + hriA + " hriB=" + hriB); 1258 } 1259 1260 if (!master.isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) { 1261 LOG.warn("Split switch is off! skip split of " + parent); 1262 throw new DoNotRetryIOException( 1263 "Split region " + parent.getRegionNameAsString() + " failed due to split switch off"); 1264 } 1265 1266 // Submit the Split procedure 1267 final byte[] splitKey = hriB.getStartKey(); 1268 if (LOG.isDebugEnabled()) { 1269 LOG.debug("Split request from {}, parent={}, splitKey={}", serverNode.getServerName(), parent, 1270 Bytes.toStringBinary(splitKey)); 1271 } 1272 // Processing this report happens asynchronously from other activities which can mutate 1273 // the region state. For example, a split procedure may already be running for this parent. 1274 // A split procedure cannot succeed if the parent region is no longer open, so we can 1275 // ignore it in that case. 1276 // Note that submitting more than one split procedure for a given region is 1277 // harmless -- the split is fenced in the procedure handling -- but it would be noisy in 1278 // the logs. Only one procedure can succeed. The other procedure(s) would abort during 1279 // initialization and report failure with WARN level logging. 1280 RegionState parentState = regionStates.getRegionState(parent); 1281 if (parentState != null && parentState.isOpened()) { 1282 master.getMasterProcedureExecutor().submitProcedure(createSplitProcedure(parent, splitKey)); 1283 } else { 1284 LOG.info("Ignoring split request from {}, parent={} because parent is unknown or not open", 1285 serverNode.getServerName(), parent); 1286 return; 1287 } 1288 1289 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the split 1290 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1291 throw new UnsupportedOperationException( 1292 String.format("Split handled by the master: " + "parent=%s hriA=%s hriB=%s", 1293 parent.getShortNameToLog(), hriA, hriB)); 1294 } 1295 } 1296 1297 private void updateRegionMergeTransition(final ServerStateNode serverNode, 1298 final TransitionCode state, final RegionInfo merged, final RegionInfo hriA, 1299 final RegionInfo hriB) throws IOException { 1300 checkMetaLoaded(merged); 1301 1302 if (state != TransitionCode.READY_TO_MERGE) { 1303 throw new UnexpectedStateException( 1304 "Unsupported merge regionState=" + state + " for regionA=" + hriA + " regionB=" + hriB 1305 + " merged=" + merged + " maybe an old RS (< 2.0) had the operation in progress"); 1306 } 1307 1308 if (!master.isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { 1309 LOG.warn("Merge switch is off! skip merge of regionA=" + hriA + " regionB=" + hriB); 1310 throw new DoNotRetryIOException( 1311 "Merge of regionA=" + hriA + " regionB=" + hriB + " failed because merge switch is off"); 1312 } 1313 1314 // Submit the Merge procedure 1315 if (LOG.isDebugEnabled()) { 1316 LOG.debug("Handling merge request from RS=" + merged + ", merged=" + merged); 1317 } 1318 master.getMasterProcedureExecutor().submitProcedure(createMergeProcedure(hriA, hriB)); 1319 1320 // If the RS is < 2.0 throw an exception to abort the operation, we are handling the merge 1321 if (master.getServerManager().getVersionNumber(serverNode.getServerName()) < 0x0200000) { 1322 throw new UnsupportedOperationException( 1323 String.format("Merge not handled yet: regionState=%s merged=%s hriA=%s hriB=%s", state, 1324 merged, hriA, hriB)); 1325 } 1326 } 1327 1328 // ============================================================================================ 1329 // RS Status update (report online regions) helpers 1330 // ============================================================================================ 1331 /** 1332 * The master will call this method when the RS send the regionServerReport(). The report will 1333 * contains the "online regions". This method will check the the online regions against the 1334 * in-memory state of the AM, and we will log a warn message if there is a mismatch. This is 1335 * because that there is no fencing between the reportRegionStateTransition method and 1336 * regionServerReport method, so there could be race and introduce inconsistency here, but 1337 * actually there is no problem. 1338 * <p/> 1339 * Please see HBASE-21421 and HBASE-21463 for more details. 1340 */ 1341 public void reportOnlineRegions(ServerName serverName, Set<byte[]> regionNames) { 1342 if (!isRunning()) { 1343 return; 1344 } 1345 if (LOG.isTraceEnabled()) { 1346 LOG.trace("ReportOnlineRegions {} regionCount={}, metaLoaded={} {}", serverName, 1347 regionNames.size(), isMetaLoaded(), 1348 regionNames.stream().map(Bytes::toStringBinary).collect(Collectors.toList())); 1349 } 1350 1351 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1352 if (serverNode == null) { 1353 LOG.warn("Got a report from server {} where its server node is null", serverName); 1354 return; 1355 } 1356 serverNode.readLock().lock(); 1357 try { 1358 if (!serverNode.isInState(ServerState.ONLINE)) { 1359 LOG.warn("Got a report from a server result in state {}", serverNode); 1360 return; 1361 } 1362 } finally { 1363 serverNode.readLock().unlock(); 1364 } 1365 1366 // Track the regionserver reported online regions in memory. 1367 synchronized (rsReports) { 1368 rsReports.put(serverName, regionNames); 1369 } 1370 1371 if (regionNames.isEmpty()) { 1372 // nothing to do if we don't have regions 1373 LOG.trace("no online region found on {}", serverName); 1374 return; 1375 } 1376 if (!isMetaLoaded()) { 1377 // we are still on startup, skip checking 1378 return; 1379 } 1380 // The Heartbeat tells us of what regions are on the region serve, check the state. 1381 checkOnlineRegionsReport(serverNode, regionNames); 1382 } 1383 1384 /** 1385 * Close <code>regionName</code> on <code>sn</code> silently and immediately without using a 1386 * Procedure or going via hbase:meta. For case where a RegionServer's hosting of a Region is not 1387 * aligned w/ the Master's accounting of Region state. This is for cleaning up an error in 1388 * accounting. 1389 */ 1390 private void closeRegionSilently(ServerName sn, byte[] regionName) { 1391 try { 1392 RegionInfo ri = MetaTableAccessor.parseRegionInfoFromRegionName(regionName); 1393 // Pass -1 for timeout. Means do not wait. 1394 ServerManager.closeRegionSilentlyAndWait(this.master.getClusterConnection(), sn, ri, -1); 1395 } catch (Exception e) { 1396 LOG.error("Failed trying to close {} on {}", Bytes.toStringBinary(regionName), sn, e); 1397 } 1398 } 1399 1400 /** 1401 * Check that what the RegionServer reports aligns with the Master's image. If disagreement, we 1402 * will tell the RegionServer to expediently close a Region we do not think it should have. 1403 */ 1404 private void checkOnlineRegionsReport(ServerStateNode serverNode, Set<byte[]> regionNames) { 1405 ServerName serverName = serverNode.getServerName(); 1406 for (byte[] regionName : regionNames) { 1407 if (!isRunning()) { 1408 return; 1409 } 1410 RegionStateNode regionNode = regionStates.getRegionStateNodeFromName(regionName); 1411 if (regionNode == null) { 1412 String regionNameAsStr = Bytes.toStringBinary(regionName); 1413 LOG.warn("No RegionStateNode for {} but reported as up on {}; closing...", regionNameAsStr, 1414 serverName); 1415 closeRegionSilently(serverNode.getServerName(), regionName); 1416 continue; 1417 } 1418 final long lag = 1000; 1419 // This is just a fallback check designed to identify unexpected data inconsistencies, so we 1420 // use tryLock to attempt to acquire the lock, and if the lock cannot be acquired, we skip the 1421 // check. This will not cause any additional problems and also prevents the regionServerReport 1422 // call from being stuck for too long which may cause deadlock on region assignment. 1423 if (regionNode.tryLock()) { 1424 try { 1425 long diff = EnvironmentEdgeManager.currentTime() - regionNode.getLastUpdate(); 1426 if (regionNode.isInState(State.OPENING, State.OPEN)) { 1427 // This is possible as a region server has just closed a region but the region server 1428 // report is generated before the closing, but arrive after the closing. Make sure 1429 // there 1430 // is some elapsed time so less false alarms. 1431 if (!regionNode.getRegionLocation().equals(serverName) && diff > lag) { 1432 LOG.warn("Reporting {} server does not match {} (time since last " 1433 + "update={}ms); closing...", serverName, regionNode, diff); 1434 closeRegionSilently(serverNode.getServerName(), regionName); 1435 } 1436 } else if (!regionNode.isInState(State.CLOSING, State.SPLITTING)) { 1437 // So, we can get report that a region is CLOSED or SPLIT because a heartbeat 1438 // came in at about same time as a region transition. Make sure there is some 1439 // elapsed time so less false alarms. 1440 if (diff > lag) { 1441 LOG.warn("Reporting {} state does not match {} (time since last update={}ms)", 1442 serverName, regionNode, diff); 1443 } 1444 } 1445 } finally { 1446 regionNode.unlock(); 1447 } 1448 } else { 1449 LOG.warn( 1450 "Unable to acquire lock for regionNode {}. It is likely that another thread is currently holding the lock. To avoid deadlock, skip execution for now.", 1451 regionNode); 1452 } 1453 } 1454 } 1455 1456 // ============================================================================================ 1457 // RIT chore 1458 // ============================================================================================ 1459 private static class RegionInTransitionChore extends ProcedureInMemoryChore<MasterProcedureEnv> { 1460 public RegionInTransitionChore(final int timeoutMsec) { 1461 super(timeoutMsec); 1462 } 1463 1464 @Override 1465 protected void periodicExecute(final MasterProcedureEnv env) { 1466 final AssignmentManager am = env.getAssignmentManager(); 1467 1468 final RegionInTransitionStat ritStat = am.computeRegionInTransitionStat(); 1469 if (ritStat.hasRegionsOverThreshold()) { 1470 for (RegionState hri : ritStat.getRegionOverThreshold()) { 1471 am.handleRegionOverStuckWarningThreshold(hri.getRegion()); 1472 } 1473 } 1474 1475 // update metrics 1476 am.updateRegionsInTransitionMetrics(ritStat); 1477 } 1478 } 1479 1480 private static class DeadServerMetricRegionChore 1481 extends ProcedureInMemoryChore<MasterProcedureEnv> { 1482 public DeadServerMetricRegionChore(final int timeoutMsec) { 1483 super(timeoutMsec); 1484 } 1485 1486 @Override 1487 protected void periodicExecute(final MasterProcedureEnv env) { 1488 final ServerManager sm = env.getMasterServices().getServerManager(); 1489 final AssignmentManager am = env.getAssignmentManager(); 1490 // To minimize inconsistencies we are not going to snapshot live servers in advance in case 1491 // new servers are added; OTOH we don't want to add heavy sync for a consistent view since 1492 // this is for metrics. Instead, we're going to check each regions as we go; to avoid making 1493 // too many checks, we maintain a local lists of server, limiting us to false negatives. If 1494 // we miss some recently-dead server, we'll just see it next time. 1495 Set<ServerName> recentlyLiveServers = new HashSet<>(); 1496 int deadRegions = 0, unknownRegions = 0; 1497 for (RegionStateNode rsn : am.getRegionStates().getRegionStateNodes()) { 1498 if (rsn.getState() != State.OPEN) { 1499 continue; // Opportunistic check, should quickly skip RITs, offline tables, etc. 1500 } 1501 // Do not need to acquire region state lock as this is only for showing metrics. 1502 ServerName sn = rsn.getRegionLocation(); 1503 State state = rsn.getState(); 1504 if (state != State.OPEN) { 1505 continue; // Mostly skipping RITs that are already being take care of. 1506 } 1507 if (sn == null) { 1508 ++unknownRegions; // Opened on null? 1509 continue; 1510 } 1511 if (recentlyLiveServers.contains(sn)) { 1512 continue; 1513 } 1514 ServerManager.ServerLiveState sls = sm.isServerKnownAndOnline(sn); 1515 switch (sls) { 1516 case LIVE: 1517 recentlyLiveServers.add(sn); 1518 break; 1519 case DEAD: 1520 ++deadRegions; 1521 break; 1522 case UNKNOWN: 1523 ++unknownRegions; 1524 break; 1525 default: 1526 throw new AssertionError("Unexpected " + sls); 1527 } 1528 } 1529 if (deadRegions > 0 || unknownRegions > 0) { 1530 LOG.info("Found {} OPEN regions on dead servers and {} OPEN regions on unknown servers", 1531 deadRegions, unknownRegions); 1532 } 1533 1534 am.updateDeadServerRegionMetrics(deadRegions, unknownRegions); 1535 } 1536 } 1537 1538 public RegionInTransitionStat computeRegionInTransitionStat() { 1539 final RegionInTransitionStat rit = new RegionInTransitionStat(getConfiguration()); 1540 rit.update(this); 1541 return rit; 1542 } 1543 1544 public static class RegionInTransitionStat { 1545 private final int ritThreshold; 1546 1547 private HashMap<String, RegionState> ritsOverThreshold = null; 1548 private long statTimestamp; 1549 private long oldestRITTime = 0; 1550 private int totalRITsTwiceThreshold = 0; 1551 private int totalRITs = 0; 1552 1553 public RegionInTransitionStat(final Configuration conf) { 1554 this.ritThreshold = 1555 conf.getInt(METRICS_RIT_STUCK_WARNING_THRESHOLD, DEFAULT_RIT_STUCK_WARNING_THRESHOLD); 1556 } 1557 1558 public int getRITThreshold() { 1559 return ritThreshold; 1560 } 1561 1562 public long getTimestamp() { 1563 return statTimestamp; 1564 } 1565 1566 public int getTotalRITs() { 1567 return totalRITs; 1568 } 1569 1570 public long getOldestRITTime() { 1571 return oldestRITTime; 1572 } 1573 1574 public int getTotalRITsOverThreshold() { 1575 Map<String, RegionState> m = this.ritsOverThreshold; 1576 return m != null ? m.size() : 0; 1577 } 1578 1579 public boolean hasRegionsTwiceOverThreshold() { 1580 return totalRITsTwiceThreshold > 0; 1581 } 1582 1583 public boolean hasRegionsOverThreshold() { 1584 Map<String, RegionState> m = this.ritsOverThreshold; 1585 return m != null && !m.isEmpty(); 1586 } 1587 1588 public Collection<RegionState> getRegionOverThreshold() { 1589 Map<String, RegionState> m = this.ritsOverThreshold; 1590 return m != null ? m.values() : Collections.emptySet(); 1591 } 1592 1593 public boolean isRegionOverThreshold(final RegionInfo regionInfo) { 1594 Map<String, RegionState> m = this.ritsOverThreshold; 1595 return m != null && m.containsKey(regionInfo.getEncodedName()); 1596 } 1597 1598 public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) { 1599 Map<String, RegionState> m = this.ritsOverThreshold; 1600 if (m == null) { 1601 return false; 1602 } 1603 final RegionState state = m.get(regionInfo.getEncodedName()); 1604 if (state == null) { 1605 return false; 1606 } 1607 return (statTimestamp - state.getStamp()) > (ritThreshold * 2); 1608 } 1609 1610 protected void update(final AssignmentManager am) { 1611 final RegionStates regionStates = am.getRegionStates(); 1612 this.statTimestamp = EnvironmentEdgeManager.currentTime(); 1613 update(regionStates.getRegionsStateInTransition(), statTimestamp); 1614 update(regionStates.getRegionFailedOpen(), statTimestamp); 1615 1616 if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) { 1617 LOG.debug("RITs over threshold: {}", 1618 ritsOverThreshold.entrySet().stream() 1619 .map(e -> e.getKey() + ":" + e.getValue().getState().name()) 1620 .collect(Collectors.joining("\n"))); 1621 } 1622 } 1623 1624 private void update(final Collection<RegionState> regions, final long currentTime) { 1625 for (RegionState state : regions) { 1626 totalRITs++; 1627 final long ritTime = currentTime - state.getStamp(); 1628 if (ritTime > ritThreshold) { 1629 if (ritsOverThreshold == null) { 1630 ritsOverThreshold = new HashMap<String, RegionState>(); 1631 } 1632 ritsOverThreshold.put(state.getRegion().getEncodedName(), state); 1633 totalRITsTwiceThreshold += (ritTime > (ritThreshold * 2)) ? 1 : 0; 1634 } 1635 if (oldestRITTime < ritTime) { 1636 oldestRITTime = ritTime; 1637 } 1638 } 1639 } 1640 } 1641 1642 private void updateRegionsInTransitionMetrics(final RegionInTransitionStat ritStat) { 1643 metrics.updateRITOldestAge(ritStat.getOldestRITTime()); 1644 metrics.updateRITCount(ritStat.getTotalRITs()); 1645 metrics.updateRITCountOverThreshold(ritStat.getTotalRITsOverThreshold()); 1646 } 1647 1648 private void updateDeadServerRegionMetrics(int deadRegions, int unknownRegions) { 1649 metrics.updateDeadServerOpenRegions(deadRegions); 1650 metrics.updateUnknownServerOpenRegions(unknownRegions); 1651 } 1652 1653 private void handleRegionOverStuckWarningThreshold(final RegionInfo regionInfo) { 1654 final RegionStateNode regionNode = regionStates.getRegionStateNode(regionInfo); 1655 // if (regionNode.isStuck()) { 1656 LOG.warn("STUCK Region-In-Transition {}", regionNode); 1657 } 1658 1659 // ============================================================================================ 1660 // TODO: Master load/bootstrap 1661 // ============================================================================================ 1662 public void joinCluster() throws IOException { 1663 long startTime = System.nanoTime(); 1664 LOG.debug("Joining cluster..."); 1665 1666 // Scan hbase:meta to build list of existing regions, servers, and assignment. 1667 // hbase:meta is online now or will be. Inside loadMeta, we keep trying. Can't make progress 1668 // w/o meta. 1669 loadMeta(); 1670 1671 while (master.getServerManager().countOfRegionServers() < 1) { 1672 LOG.info("Waiting for RegionServers to join; current count={}", 1673 master.getServerManager().countOfRegionServers()); 1674 Threads.sleep(250); 1675 } 1676 LOG.info("Number of RegionServers={}", master.getServerManager().countOfRegionServers()); 1677 1678 // Start the chores 1679 master.getMasterProcedureExecutor().addChore(this.ritChore); 1680 master.getMasterProcedureExecutor().addChore(this.deadMetricChore); 1681 1682 long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); 1683 LOG.info("Joined the cluster in {}", StringUtils.humanTimeDiff(costMs)); 1684 } 1685 1686 /** 1687 * Create assign procedure for offline regions. Just follow the old 1688 * processofflineServersWithOnlineRegions method. Since now we do not need to deal with dead 1689 * server any more, we only deal with the regions in OFFLINE state in this method. And this is a 1690 * bit strange, that for new regions, we will add it in CLOSED state instead of OFFLINE state, and 1691 * usually there will be a procedure to track them. The processofflineServersWithOnlineRegions is 1692 * a legacy from long ago, as things are going really different now, maybe we do not need this 1693 * method any more. Need to revisit later. 1694 */ 1695 // Public so can be run by the Master as part of the startup. Needs hbase:meta to be online. 1696 // Needs to be done after the table state manager has been started. 1697 public void processOfflineRegions() { 1698 TransitRegionStateProcedure[] procs = 1699 regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE)) 1700 .filter(rsn -> isTableEnabled(rsn.getRegionInfo().getTable())).map(rsn -> { 1701 rsn.lock(); 1702 try { 1703 if (rsn.getProcedure() != null) { 1704 return null; 1705 } else { 1706 return rsn.setProcedure(TransitRegionStateProcedure.assign(getProcedureEnvironment(), 1707 rsn.getRegionInfo(), null)); 1708 } 1709 } finally { 1710 rsn.unlock(); 1711 } 1712 }).filter(p -> p != null).toArray(TransitRegionStateProcedure[]::new); 1713 if (procs.length > 0) { 1714 master.getMasterProcedureExecutor().submitProcedures(procs); 1715 } 1716 } 1717 1718 /* 1719 * AM internal RegionStateStore.RegionStateVisitor implementation. To be used when scanning META 1720 * table for region rows, using RegionStateStore utility methods. RegionStateStore methods will 1721 * convert Result into proper RegionInfo instances, but those would still need to be added into 1722 * AssignmentManager.regionStates in-memory cache. RegionMetaLoadingVisitor.visitRegionState 1723 * method provides the logic for adding RegionInfo instances as loaded from latest META scan into 1724 * AssignmentManager.regionStates. 1725 */ 1726 private class RegionMetaLoadingVisitor implements RegionStateStore.RegionStateVisitor { 1727 1728 @Override 1729 public void visitRegionState(Result result, final RegionInfo regionInfo, final State state, 1730 final ServerName regionLocation, final ServerName lastHost, final long openSeqNum) { 1731 if ( 1732 state == null && regionLocation == null && lastHost == null 1733 && openSeqNum == SequenceId.NO_SEQUENCE_ID 1734 ) { 1735 // This is a row with nothing in it. 1736 LOG.warn("Skipping empty row={}", result); 1737 return; 1738 } 1739 State localState = state; 1740 if (localState == null) { 1741 // No region state column data in hbase:meta table! Are I doing a rolling upgrade from 1742 // hbase1 to hbase2? Am I restoring a SNAPSHOT or otherwise adding a region to hbase:meta? 1743 // In any of these cases, state is empty. For now, presume OFFLINE but there are probably 1744 // cases where we need to probe more to be sure this correct; TODO informed by experience. 1745 LOG.info(regionInfo.getEncodedName() + " regionState=null; presuming " + State.OFFLINE); 1746 localState = State.OFFLINE; 1747 } 1748 RegionStateNode regionNode = regionStates.getOrCreateRegionStateNode(regionInfo); 1749 // Do not need to lock on regionNode, as we can make sure that before we finish loading 1750 // meta, all the related procedures can not be executed. The only exception is for meta 1751 // region related operations, but here we do not load the informations for meta region. 1752 regionNode.setState(localState); 1753 regionNode.setLastHost(lastHost); 1754 regionNode.setRegionLocation(regionLocation); 1755 regionNode.setOpenSeqNum(openSeqNum); 1756 1757 // Note: keep consistent with other methods, see region(Opening|Opened|Closing) 1758 // RIT/ServerCrash handling should take care of the transiting regions. 1759 if ( 1760 localState.matches(State.OPEN, State.OPENING, State.CLOSING, State.SPLITTING, State.MERGING) 1761 ) { 1762 assert regionLocation != null : "found null region location for " + regionNode; 1763 // TODO: this could lead to some orphan server state nodes, as it is possible that the 1764 // region server is already dead and its SCP has already finished but we have 1765 // persisted an opening state on this region server. Finally the TRSP will assign the 1766 // region to another region server, so it will not cause critical problems, just waste 1767 // some memory as no one will try to cleanup these orphan server state nodes. 1768 regionStates.createServer(regionLocation); 1769 regionStates.addRegionToServer(regionNode); 1770 } else if (localState == State.OFFLINE || regionInfo.isOffline()) { 1771 regionStates.addToOfflineRegions(regionNode); 1772 } 1773 if (regionNode.getProcedure() != null) { 1774 regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode); 1775 } 1776 } 1777 }; 1778 1779 /** 1780 * Attempt to load {@code regionInfo} from META, adding any results to the 1781 * {@link #regionStateStore} Is NOT aware of replica regions. 1782 * @param regionInfo the region to be loaded from META. 1783 * @throws IOException If some error occurs while querying META or parsing results. 1784 */ 1785 public void populateRegionStatesFromMeta(@NonNull final RegionInfo regionInfo) 1786 throws IOException { 1787 final String regionEncodedName = RegionInfo.DEFAULT_REPLICA_ID == regionInfo.getReplicaId() 1788 ? regionInfo.getEncodedName() 1789 : RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(RegionInfo.DEFAULT_REPLICA_ID).build() 1790 .getEncodedName(); 1791 populateRegionStatesFromMeta(regionEncodedName); 1792 } 1793 1794 /** 1795 * Attempt to load {@code regionEncodedName} from META, adding any results to the 1796 * {@link #regionStateStore} Is NOT aware of replica regions. 1797 * @param regionEncodedName encoded name for the region to be loaded from META. 1798 * @throws IOException If some error occurs while querying META or parsing results. 1799 */ 1800 public void populateRegionStatesFromMeta(@NonNull String regionEncodedName) throws IOException { 1801 final RegionMetaLoadingVisitor visitor = new RegionMetaLoadingVisitor(); 1802 regionStateStore.visitMetaForRegion(regionEncodedName, visitor); 1803 } 1804 1805 private void loadMeta() throws IOException { 1806 // TODO: use a thread pool 1807 regionStateStore.visitMeta(new RegionMetaLoadingVisitor()); 1808 } 1809 1810 /** 1811 * Used to check if the meta loading is done. 1812 * <p/> 1813 * if not we throw PleaseHoldException since we are rebuilding the RegionStates 1814 * @param hri region to check if it is already rebuild 1815 * @throws PleaseHoldException if meta has not been loaded yet 1816 */ 1817 private void checkMetaLoaded(RegionInfo hri) throws PleaseHoldException { 1818 if (!isRunning()) { 1819 throw new PleaseHoldException("AssignmentManager not running"); 1820 } 1821 boolean meta = isMetaRegion(hri); 1822 boolean metaLoaded = isMetaLoaded(); 1823 if (!meta && !metaLoaded) { 1824 throw new PleaseHoldException( 1825 "Master not fully online; hbase:meta=" + meta + ", metaLoaded=" + metaLoaded); 1826 } 1827 } 1828 1829 // ============================================================================================ 1830 // TODO: Metrics 1831 // ============================================================================================ 1832 public int getNumRegionsOpened() { 1833 // TODO: Used by TestRegionPlacement.java and assume monotonically increasing value 1834 return 0; 1835 } 1836 1837 /** 1838 * Usually run by the Master in reaction to server crash during normal processing. Can also be 1839 * invoked via external RPC to effect repair; in the latter case, the 'force' flag is set so we 1840 * push through the SCP though context may indicate already-running-SCP (An old SCP may have 1841 * exited abnormally, or damaged cluster may still have references in hbase:meta to 'Unknown 1842 * Servers' -- servers that are not online or in dead servers list, etc.) 1843 * @param force Set if the request came in externally over RPC (via hbck2). Force means run the 1844 * SCP even if it seems as though there might be an outstanding SCP running. 1845 * @return pid of scheduled SCP or {@link Procedure#NO_PROC_ID} if none scheduled. 1846 */ 1847 public long submitServerCrash(ServerName serverName, boolean shouldSplitWal, boolean force) { 1848 // May be an 'Unknown Server' so handle case where serverNode is null. 1849 ServerStateNode serverNode = regionStates.getServerNode(serverName); 1850 // Remove the in-memory rsReports result 1851 synchronized (rsReports) { 1852 rsReports.remove(serverName); 1853 } 1854 if (serverNode == null) { 1855 if (force) { 1856 LOG.info("Force adding ServerCrashProcedure for {} when server node is null", serverName); 1857 } else { 1858 // for normal case, do not schedule SCP if ServerStateNode is null 1859 LOG.warn("Skip adding ServerCrashProcedure for {} because server node is null", serverName); 1860 return Procedure.NO_PROC_ID; 1861 } 1862 } 1863 1864 ProcedureExecutor<MasterProcedureEnv> procExec = this.master.getMasterProcedureExecutor(); 1865 // We hold the write lock here for fencing on reportRegionStateTransition. Once we set the 1866 // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from 1867 // this server. This is used to simplify the implementation for TRSP and SCP, where we can make 1868 // sure that, the region list fetched by SCP will not be changed any more. 1869 if (serverNode != null) { 1870 serverNode.writeLock().lock(); 1871 } 1872 try { 1873 1874 boolean carryingMeta = isCarryingMeta(serverName); 1875 if (serverNode != null && !serverNode.isInState(ServerState.ONLINE)) { 1876 if (force) { 1877 LOG.info("Force adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1878 serverNode, carryingMeta, ServerState.ONLINE); 1879 } else { 1880 LOG.info("Skip adding ServerCrashProcedure for {} (meta={}) when state is not {}", 1881 serverNode, carryingMeta, ServerState.ONLINE); 1882 return Procedure.NO_PROC_ID; 1883 } 1884 } 1885 MasterProcedureEnv mpe = procExec.getEnvironment(); 1886 // If serverNode == null, then 'Unknown Server'. Schedule HBCKSCP instead. 1887 // HBCKSCP scours Master in-memory state AND hbase;meta for references to 1888 // serverName just-in-case. An SCP that is scheduled when the server is 1889 // 'Unknown' probably originated externally with HBCK2 fix-it tool. 1890 ServerState oldState = null; 1891 if (serverNode != null) { 1892 oldState = serverNode.getState(); 1893 serverNode.setState(ServerState.CRASHED); 1894 } 1895 ServerCrashProcedure scp = force 1896 ? new HBCKServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta) 1897 : new ServerCrashProcedure(mpe, serverName, shouldSplitWal, carryingMeta); 1898 long pid = procExec.submitProcedure(scp); 1899 LOG.info("Scheduled ServerCrashProcedure pid={} for {} (carryingMeta={}){}.", pid, serverName, 1900 carryingMeta, 1901 serverNode == null ? "" : " " + serverNode.toString() + ", oldState=" + oldState); 1902 return pid; 1903 } finally { 1904 if (serverNode != null) { 1905 serverNode.writeLock().unlock(); 1906 } 1907 } 1908 } 1909 1910 public void offlineRegion(final RegionInfo regionInfo) { 1911 // TODO used by MasterRpcServices 1912 RegionStateNode node = regionStates.getRegionStateNode(regionInfo); 1913 if (node != null) { 1914 node.offline(); 1915 } 1916 } 1917 1918 public void onlineRegion(final RegionInfo regionInfo, final ServerName serverName) { 1919 // TODO used by TestSplitTransactionOnCluster.java 1920 } 1921 1922 public Map<ServerName, List<RegionInfo>> 1923 getSnapShotOfAssignment(final Collection<RegionInfo> regions) { 1924 return regionStates.getSnapShotOfAssignment(regions); 1925 } 1926 1927 // ============================================================================================ 1928 // TODO: UTILS/HELPERS? 1929 // ============================================================================================ 1930 /** 1931 * Used by the client (via master) to identify if all regions have the schema updates 1932 * @return Pair indicating the status of the alter command (pending/total) 1933 */ 1934 public Pair<Integer, Integer> getReopenStatus(TableName tableName) { 1935 if (isTableDisabled(tableName)) { 1936 return new Pair<Integer, Integer>(0, 0); 1937 } 1938 1939 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 1940 int ritCount = 0; 1941 for (RegionState regionState : states) { 1942 if (!regionState.isOpened() && !regionState.isSplit()) { 1943 ritCount++; 1944 } 1945 } 1946 return new Pair<Integer, Integer>(ritCount, states.size()); 1947 } 1948 1949 // ============================================================================================ 1950 // TODO: Region State In Transition 1951 // ============================================================================================ 1952 public boolean hasRegionsInTransition() { 1953 return regionStates.hasRegionsInTransition(); 1954 } 1955 1956 public List<RegionStateNode> getRegionsInTransition() { 1957 return regionStates.getRegionsInTransition(); 1958 } 1959 1960 public List<RegionInfo> getAssignedRegions() { 1961 return regionStates.getAssignedRegions(); 1962 } 1963 1964 /** 1965 * Resolve a cached {@link RegionInfo} from the region name as a {@code byte[]}. 1966 */ 1967 public RegionInfo getRegionInfo(final byte[] regionName) { 1968 final RegionStateNode regionState = regionStates.getRegionStateNodeFromName(regionName); 1969 return regionState != null ? regionState.getRegionInfo() : null; 1970 } 1971 1972 /** 1973 * Resolve a cached {@link RegionInfo} from the encoded region name as a {@code String}. 1974 */ 1975 public RegionInfo getRegionInfo(final String encodedRegionName) { 1976 final RegionStateNode regionState = 1977 regionStates.getRegionStateNodeFromEncodedRegionName(encodedRegionName); 1978 return regionState != null ? regionState.getRegionInfo() : null; 1979 } 1980 1981 // ============================================================================================ 1982 // Expected states on region state transition. 1983 // Notice that there is expected states for transiting to OPENING state, this is because SCP. 1984 // See the comments in regionOpening method for more details. 1985 // ============================================================================================ 1986 private static final State[] STATES_EXPECTED_ON_OPEN = { State.OPENING, // Normal case 1987 State.OPEN // Retrying 1988 }; 1989 1990 private static final State[] STATES_EXPECTED_ON_CLOSING = { State.OPEN, // Normal case 1991 State.CLOSING, // Retrying 1992 State.SPLITTING, // Offline the split parent 1993 State.MERGING // Offline the merge parents 1994 }; 1995 1996 private static final State[] STATES_EXPECTED_ON_CLOSED = { State.CLOSING, // Normal case 1997 State.CLOSED // Retrying 1998 }; 1999 2000 // This is for manually scheduled region assign, can add other states later if we find out other 2001 // usages 2002 private static final State[] STATES_EXPECTED_ON_ASSIGN = { State.CLOSED, State.OFFLINE }; 2003 2004 // We only allow unassign or move a region which is in OPEN state. 2005 private static final State[] STATES_EXPECTED_ON_UNASSIGN_OR_MOVE = { State.OPEN }; 2006 2007 // ============================================================================================ 2008 // Region Status update 2009 // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking 2010 // and pre-assumptions are very tricky. 2011 // ============================================================================================ 2012 private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState, 2013 RegionState.State... expectedStates) throws IOException { 2014 RegionState.State state = regionNode.getState(); 2015 regionNode.transitionState(newState, expectedStates); 2016 boolean succ = false; 2017 try { 2018 regionStateStore.updateRegionLocation(regionNode); 2019 succ = true; 2020 } finally { 2021 if (!succ) { 2022 // revert 2023 regionNode.setState(state); 2024 } 2025 } 2026 } 2027 2028 // should be called within the synchronized block of RegionStateNode 2029 void regionOpening(RegionStateNode regionNode) throws IOException { 2030 // As in SCP, for performance reason, there is no TRSP attached with this region, we will not 2031 // update the region state, which means that the region could be in any state when we want to 2032 // assign it after a RS crash. So here we do not pass the expectedStates parameter. 2033 transitStateAndUpdate(regionNode, State.OPENING); 2034 ServerStateNode serverNode = regionStates.getServerNode(regionNode.getRegionLocation()); 2035 // Here the server node could be null. For example, we want to assign the region to a given 2036 // region server and it crashes, and it is the region server which holds hbase:meta, then the 2037 // above transitStateAndUpdate call will never succeed until we finishes the SCP for it. But 2038 // after the SCP finishes, the server node will be removed, so when we arrive there, the 2039 // server node will be null. This is not a big problem if we skip adding it, as later we will 2040 // fail to execute the remote procedure on the region server and then try to assign to another 2041 // region server 2042 if (serverNode != null) { 2043 serverNode.addRegion(regionNode); 2044 } 2045 // update the operation count metrics 2046 metrics.incrementOperationCounter(); 2047 } 2048 2049 // should be called under the RegionStateNode lock 2050 // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then 2051 // we will persist the FAILED_OPEN state into hbase:meta. 2052 void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException { 2053 RegionState.State state = regionNode.getState(); 2054 ServerName regionLocation = regionNode.getRegionLocation(); 2055 if (giveUp) { 2056 regionNode.setState(State.FAILED_OPEN); 2057 regionNode.setRegionLocation(null); 2058 boolean succ = false; 2059 try { 2060 regionStateStore.updateRegionLocation(regionNode); 2061 succ = true; 2062 } finally { 2063 if (!succ) { 2064 // revert 2065 regionNode.setState(state); 2066 regionNode.setRegionLocation(regionLocation); 2067 } 2068 } 2069 } 2070 if (regionLocation != null) { 2071 regionStates.removeRegionFromServer(regionLocation, regionNode); 2072 } 2073 } 2074 2075 // should be called under the RegionStateNode lock 2076 void regionClosing(RegionStateNode regionNode) throws IOException { 2077 transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING); 2078 2079 RegionInfo hri = regionNode.getRegionInfo(); 2080 // Set meta has not initialized early. so people trying to create/edit tables will wait 2081 if (isMetaRegion(hri)) { 2082 setMetaAssigned(hri, false); 2083 } 2084 // update the operation count metrics 2085 metrics.incrementOperationCounter(); 2086 } 2087 2088 // for open and close, they will first be persist to the procedure store in 2089 // RegionRemoteProcedureBase. So here we will first change the in memory state as it is considered 2090 // as succeeded if the persistence to procedure store is succeeded, and then when the 2091 // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. 2092 2093 // should be called under the RegionStateNode lock 2094 void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 2095 regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); 2096 RegionInfo regionInfo = regionNode.getRegionInfo(); 2097 regionStates.addRegionToServer(regionNode); 2098 regionStates.removeFromFailedOpen(regionInfo); 2099 } 2100 2101 // should be called under the RegionStateNode lock 2102 void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { 2103 ServerName regionLocation = regionNode.getRegionLocation(); 2104 regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); 2105 regionNode.setRegionLocation(null); 2106 if (regionLocation != null) { 2107 regionNode.setLastHost(regionLocation); 2108 regionStates.removeRegionFromServer(regionLocation, regionNode); 2109 } 2110 } 2111 2112 // should be called under the RegionStateNode lock 2113 // for SCP 2114 public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException { 2115 RegionState.State state = regionNode.getState(); 2116 ServerName regionLocation = regionNode.getRegionLocation(); 2117 regionNode.transitionState(State.ABNORMALLY_CLOSED); 2118 regionNode.setRegionLocation(null); 2119 boolean succ = false; 2120 try { 2121 regionStateStore.updateRegionLocation(regionNode); 2122 succ = true; 2123 } finally { 2124 if (!succ) { 2125 // revert 2126 regionNode.setState(state); 2127 regionNode.setRegionLocation(regionLocation); 2128 } 2129 } 2130 if (regionLocation != null) { 2131 regionNode.setLastHost(regionLocation); 2132 regionStates.removeRegionFromServer(regionLocation, regionNode); 2133 } 2134 } 2135 2136 void persistToMeta(RegionStateNode regionNode) throws IOException { 2137 regionStateStore.updateRegionLocation(regionNode); 2138 RegionInfo regionInfo = regionNode.getRegionInfo(); 2139 if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { 2140 // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it 2141 // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager 2142 // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state 2143 // on table that contains state. 2144 setMetaAssigned(regionInfo, true); 2145 } 2146 } 2147 2148 // ============================================================================================ 2149 // The above methods can only be called in TransitRegionStateProcedure(and related procedures) 2150 // ============================================================================================ 2151 2152 public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, 2153 final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { 2154 // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. 2155 // The parent stays in regionStates until cleared when removed by CatalogJanitor. 2156 // Update its state in regionStates to it shows as offline and split when read 2157 // later figuring what regions are in a table and what are not: see 2158 // regionStates#getRegionsOfTable 2159 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(parent); 2160 node.setState(State.SPLIT); 2161 final RegionStateNode nodeA = regionStates.getOrCreateRegionStateNode(daughterA); 2162 nodeA.setState(State.SPLITTING_NEW); 2163 final RegionStateNode nodeB = regionStates.getOrCreateRegionStateNode(daughterB); 2164 nodeB.setState(State.SPLITTING_NEW); 2165 2166 // TODO: here we just update the parent region info in meta, to set split and offline to true, 2167 // without changing the one in the region node. This is a bit confusing but the region info 2168 // field in RegionStateNode is not expected to be changed in the current design. Need to find a 2169 // possible way to address this problem, or at least adding more comments about the trick to 2170 // deal with this problem, that when you want to filter out split parent, you need to check both 2171 // the RegionState on whether it is split, and also the region info. If one of them matches then 2172 // it is a split parent. And usually only one of them can match, as after restart, the region 2173 // state will be changed from SPLIT to CLOSED. 2174 regionStateStore.splitRegion(parent, daughterA, daughterB, serverName); 2175 if (shouldAssignFavoredNodes(parent)) { 2176 List<ServerName> onlineServers = this.master.getServerManager().getOnlineServersList(); 2177 ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForDaughter(onlineServers, parent, 2178 daughterA, daughterB); 2179 } 2180 } 2181 2182 /** 2183 * When called here, the merge has happened. The merged regions have been unassigned and the above 2184 * markRegionClosed has been called on each so they have been disassociated from a hosting Server. 2185 * The merged region will be open after this call. The merged regions are removed from hbase:meta 2186 * below. Later they are deleted from the filesystem by the catalog janitor running against 2187 * hbase:meta. It notices when the merged region no longer holds references to the old regions 2188 * (References are deleted after a compaction rewrites what the Reference points at but not until 2189 * the archiver chore runs, are the References removed). 2190 */ 2191 public void markRegionAsMerged(final RegionInfo child, final ServerName serverName, 2192 RegionInfo[] mergeParents) throws IOException { 2193 final RegionStateNode node = regionStates.getOrCreateRegionStateNode(child); 2194 node.setState(State.MERGED); 2195 for (RegionInfo ri : mergeParents) { 2196 regionStates.deleteRegion(ri); 2197 2198 } 2199 regionStateStore.mergeRegions(child, mergeParents, serverName); 2200 if (shouldAssignFavoredNodes(child)) { 2201 ((FavoredNodesPromoter) getBalancer()).generateFavoredNodesForMergedRegion(child, 2202 mergeParents); 2203 } 2204 } 2205 2206 /* 2207 * Favored nodes should be applied only when FavoredNodes balancer is configured and the region 2208 * belongs to a non-system table. 2209 */ 2210 private boolean shouldAssignFavoredNodes(RegionInfo region) { 2211 return this.shouldAssignRegionsWithFavoredNodes 2212 && FavoredNodesManager.isFavoredNodeApplicable(region); 2213 } 2214 2215 // ============================================================================================ 2216 // Assign Queue (Assign/Balance) 2217 // ============================================================================================ 2218 private final ArrayList<RegionStateNode> pendingAssignQueue = new ArrayList<RegionStateNode>(); 2219 private final ReentrantLock assignQueueLock = new ReentrantLock(); 2220 private final Condition assignQueueFullCond = assignQueueLock.newCondition(); 2221 2222 /** 2223 * Add the assign operation to the assignment queue. The pending assignment operation will be 2224 * processed, and each region will be assigned by a server using the balancer. 2225 */ 2226 protected void queueAssign(final RegionStateNode regionNode) { 2227 regionNode.getProcedureEvent().suspend(); 2228 2229 // TODO: quick-start for meta and the other sys-tables? 2230 assignQueueLock.lock(); 2231 try { 2232 pendingAssignQueue.add(regionNode); 2233 if ( 2234 regionNode.isSystemTable() || pendingAssignQueue.size() == 1 2235 || pendingAssignQueue.size() >= assignDispatchWaitQueueMaxSize 2236 ) { 2237 assignQueueFullCond.signal(); 2238 } 2239 } finally { 2240 assignQueueLock.unlock(); 2241 } 2242 } 2243 2244 private void startAssignmentThread() { 2245 assignThread = new Thread(master.getServerName().toShortString()) { 2246 @Override 2247 public void run() { 2248 while (isRunning()) { 2249 processAssignQueue(); 2250 } 2251 pendingAssignQueue.clear(); 2252 } 2253 }; 2254 assignThread.setDaemon(true); 2255 assignThread.start(); 2256 } 2257 2258 private void stopAssignmentThread() { 2259 assignQueueSignal(); 2260 try { 2261 while (assignThread.isAlive()) { 2262 assignQueueSignal(); 2263 assignThread.join(250); 2264 } 2265 } catch (InterruptedException e) { 2266 LOG.warn("join interrupted", e); 2267 Thread.currentThread().interrupt(); 2268 } 2269 } 2270 2271 private void assignQueueSignal() { 2272 assignQueueLock.lock(); 2273 try { 2274 assignQueueFullCond.signal(); 2275 } finally { 2276 assignQueueLock.unlock(); 2277 } 2278 } 2279 2280 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") 2281 private HashMap<RegionInfo, RegionStateNode> waitOnAssignQueue() { 2282 HashMap<RegionInfo, RegionStateNode> regions = null; 2283 2284 assignQueueLock.lock(); 2285 try { 2286 if (pendingAssignQueue.isEmpty() && isRunning()) { 2287 assignQueueFullCond.await(); 2288 } 2289 2290 if (!isRunning()) { 2291 return null; 2292 } 2293 assignQueueFullCond.await(assignDispatchWaitMillis, TimeUnit.MILLISECONDS); 2294 regions = new HashMap<RegionInfo, RegionStateNode>(pendingAssignQueue.size()); 2295 for (RegionStateNode regionNode : pendingAssignQueue) { 2296 regions.put(regionNode.getRegionInfo(), regionNode); 2297 } 2298 pendingAssignQueue.clear(); 2299 } catch (InterruptedException e) { 2300 LOG.warn("got interrupted ", e); 2301 Thread.currentThread().interrupt(); 2302 } finally { 2303 assignQueueLock.unlock(); 2304 } 2305 return regions; 2306 } 2307 2308 private void processAssignQueue() { 2309 final HashMap<RegionInfo, RegionStateNode> regions = waitOnAssignQueue(); 2310 if (regions == null || regions.size() == 0 || !isRunning()) { 2311 return; 2312 } 2313 2314 if (LOG.isTraceEnabled()) { 2315 LOG.trace("PROCESS ASSIGN QUEUE regionCount=" + regions.size()); 2316 } 2317 2318 // TODO: Optimize balancer. pass a RegionPlan? 2319 final HashMap<RegionInfo, ServerName> retainMap = new HashMap<>(); 2320 final List<RegionInfo> userHRIs = new ArrayList<>(regions.size()); 2321 // Regions for system tables requiring reassignment 2322 final List<RegionInfo> systemHRIs = new ArrayList<>(); 2323 for (RegionStateNode regionStateNode : regions.values()) { 2324 boolean sysTable = regionStateNode.isSystemTable(); 2325 final List<RegionInfo> hris = sysTable ? systemHRIs : userHRIs; 2326 if (regionStateNode.getRegionLocation() != null) { 2327 retainMap.put(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation()); 2328 } else { 2329 hris.add(regionStateNode.getRegionInfo()); 2330 } 2331 } 2332 2333 // TODO: connect with the listener to invalidate the cache 2334 2335 // TODO use events 2336 List<ServerName> servers = master.getServerManager().createDestinationServersList(); 2337 for (int i = 0; servers.size() < 1; ++i) { 2338 // Report every fourth time around this loop; try not to flood log. 2339 if (i % 4 == 0) { 2340 LOG.warn("No servers available; cannot place " + regions.size() + " unassigned regions."); 2341 } 2342 2343 if (!isRunning()) { 2344 LOG.debug("Stopped! Dropping assign of " + regions.size() + " queued regions."); 2345 return; 2346 } 2347 Threads.sleep(250); 2348 servers = master.getServerManager().createDestinationServersList(); 2349 } 2350 2351 if (!systemHRIs.isEmpty()) { 2352 // System table regions requiring reassignment are present, get region servers 2353 // not available for system table regions 2354 final List<ServerName> excludeServers = getExcludedServersForSystemTable(); 2355 List<ServerName> serversForSysTables = 2356 servers.stream().filter(s -> !excludeServers.contains(s)).collect(Collectors.toList()); 2357 if (serversForSysTables.isEmpty()) { 2358 LOG.warn("Filtering old server versions and the excluded produced an empty set; " 2359 + "instead considering all candidate servers!"); 2360 } 2361 LOG.debug("Processing assignQueue; systemServersCount=" + serversForSysTables.size() 2362 + ", allServersCount=" + servers.size()); 2363 processAssignmentPlans(regions, null, systemHRIs, 2364 serversForSysTables.isEmpty() && !containsBogusAssignments(regions, systemHRIs) 2365 ? servers 2366 : serversForSysTables); 2367 } 2368 2369 processAssignmentPlans(regions, retainMap, userHRIs, servers); 2370 } 2371 2372 private boolean containsBogusAssignments(Map<RegionInfo, RegionStateNode> regions, 2373 List<RegionInfo> hirs) { 2374 for (RegionInfo ri : hirs) { 2375 if ( 2376 regions.get(ri).getRegionLocation() != null 2377 && regions.get(ri).getRegionLocation().equals(LoadBalancer.BOGUS_SERVER_NAME) 2378 ) { 2379 return true; 2380 } 2381 } 2382 return false; 2383 } 2384 2385 private void processAssignmentPlans(final HashMap<RegionInfo, RegionStateNode> regions, 2386 final HashMap<RegionInfo, ServerName> retainMap, final List<RegionInfo> hris, 2387 final List<ServerName> servers) { 2388 boolean isTraceEnabled = LOG.isTraceEnabled(); 2389 if (isTraceEnabled) { 2390 LOG.trace("Available servers count=" + servers.size() + ": " + servers); 2391 } 2392 2393 final LoadBalancer balancer = getBalancer(); 2394 // ask the balancer where to place regions 2395 if (retainMap != null && !retainMap.isEmpty()) { 2396 if (isTraceEnabled) { 2397 LOG.trace("retain assign regions=" + retainMap); 2398 } 2399 try { 2400 acceptPlan(regions, balancer.retainAssignment(retainMap, servers)); 2401 } catch (HBaseIOException e) { 2402 LOG.warn("unable to retain assignment", e); 2403 addToPendingAssignment(regions, retainMap.keySet()); 2404 } 2405 } 2406 2407 // TODO: Do we need to split retain and round-robin? 2408 // the retain seems to fallback to round-robin/random if the region is not in the map. 2409 if (!hris.isEmpty()) { 2410 Collections.sort(hris, RegionInfo.COMPARATOR); 2411 if (isTraceEnabled) { 2412 LOG.trace("round robin regions=" + hris); 2413 } 2414 try { 2415 acceptPlan(regions, balancer.roundRobinAssignment(hris, servers)); 2416 } catch (HBaseIOException e) { 2417 LOG.warn("unable to round-robin assignment", e); 2418 addToPendingAssignment(regions, hris); 2419 } 2420 } 2421 } 2422 2423 private void acceptPlan(final HashMap<RegionInfo, RegionStateNode> regions, 2424 final Map<ServerName, List<RegionInfo>> plan) throws HBaseIOException { 2425 final ProcedureEvent<?>[] events = new ProcedureEvent[regions.size()]; 2426 final long st = EnvironmentEdgeManager.currentTime(); 2427 2428 if (plan.isEmpty()) { 2429 throw new HBaseIOException("unable to compute plans for regions=" + regions.size()); 2430 } 2431 2432 int evcount = 0; 2433 for (Map.Entry<ServerName, List<RegionInfo>> entry : plan.entrySet()) { 2434 final ServerName server = entry.getKey(); 2435 for (RegionInfo hri : entry.getValue()) { 2436 final RegionStateNode regionNode = regions.get(hri); 2437 regionNode.setRegionLocation(server); 2438 if (server.equals(LoadBalancer.BOGUS_SERVER_NAME) && regionNode.isSystemTable()) { 2439 assignQueueLock.lock(); 2440 try { 2441 pendingAssignQueue.add(regionNode); 2442 } finally { 2443 assignQueueLock.unlock(); 2444 } 2445 } else { 2446 events[evcount++] = regionNode.getProcedureEvent(); 2447 } 2448 } 2449 } 2450 ProcedureEvent.wakeEvents(getProcedureScheduler(), events); 2451 2452 final long et = EnvironmentEdgeManager.currentTime(); 2453 if (LOG.isTraceEnabled()) { 2454 LOG.trace("ASSIGN ACCEPT " + events.length + " -> " + StringUtils.humanTimeDiff(et - st)); 2455 } 2456 } 2457 2458 private void addToPendingAssignment(final HashMap<RegionInfo, RegionStateNode> regions, 2459 final Collection<RegionInfo> pendingRegions) { 2460 assignQueueLock.lock(); 2461 try { 2462 for (RegionInfo hri : pendingRegions) { 2463 pendingAssignQueue.add(regions.get(hri)); 2464 } 2465 } finally { 2466 assignQueueLock.unlock(); 2467 } 2468 } 2469 2470 /** 2471 * For a given cluster with mixed versions of servers, get a list of servers with lower versions, 2472 * where system table regions should not be assigned to. For system table, we must assign regions 2473 * to a server with highest version. However, we can disable this exclusion using config: 2474 * "hbase.min.version.move.system.tables" if checkForMinVersion is true. Detailed explanation 2475 * available with definition of minVersionToMoveSysTables. 2476 * @return List of Excluded servers for System table regions. 2477 */ 2478 public List<ServerName> getExcludedServersForSystemTable() { 2479 // TODO: This should be a cached list kept by the ServerManager rather than calculated on each 2480 // move or system region assign. The RegionServerTracker keeps list of online Servers with 2481 // RegionServerInfo that includes Version. 2482 List<Pair<ServerName, String>> serverList = 2483 master.getServerManager().getOnlineServersList().stream() 2484 .map(s -> new Pair<>(s, master.getRegionServerVersion(s))).collect(Collectors.toList()); 2485 if (serverList.isEmpty()) { 2486 return new ArrayList<>(); 2487 } 2488 String highestVersion = Collections 2489 .max(serverList, (o1, o2) -> VersionInfo.compareVersion(o1.getSecond(), o2.getSecond())) 2490 .getSecond(); 2491 if (!DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG.equals(minVersionToMoveSysTables)) { 2492 int comparedValue = VersionInfo.compareVersion(minVersionToMoveSysTables, highestVersion); 2493 if (comparedValue > 0) { 2494 return new ArrayList<>(); 2495 } 2496 } 2497 return serverList.stream().filter(pair -> !pair.getSecond().equals(highestVersion)) 2498 .map(Pair::getFirst).collect(Collectors.toList()); 2499 } 2500 2501 MasterServices getMaster() { 2502 return master; 2503 } 2504 2505 /** Returns a snapshot of rsReports */ 2506 public Map<ServerName, Set<byte[]>> getRSReports() { 2507 Map<ServerName, Set<byte[]>> rsReportsSnapshot = new HashMap<>(); 2508 synchronized (rsReports) { 2509 rsReports.entrySet().forEach(e -> rsReportsSnapshot.put(e.getKey(), e.getValue())); 2510 } 2511 return rsReportsSnapshot; 2512 } 2513 2514 /** 2515 * Provide regions state count for given table. e.g howmany regions of give table are 2516 * opened/closed/rit etc 2517 * @param tableName TableName 2518 * @return region states count 2519 */ 2520 public RegionStatesCount getRegionStatesCount(TableName tableName) { 2521 int openRegionsCount = 0; 2522 int closedRegionCount = 0; 2523 int ritCount = 0; 2524 int splitRegionCount = 0; 2525 int totalRegionCount = 0; 2526 if (!isTableDisabled(tableName)) { 2527 final List<RegionState> states = regionStates.getTableRegionStates(tableName); 2528 for (RegionState regionState : states) { 2529 if (regionState.isOpened()) { 2530 openRegionsCount++; 2531 } else if (regionState.isClosed()) { 2532 closedRegionCount++; 2533 } else if (regionState.isSplit()) { 2534 splitRegionCount++; 2535 } 2536 } 2537 totalRegionCount = states.size(); 2538 ritCount = totalRegionCount - openRegionsCount - splitRegionCount; 2539 } 2540 return new RegionStatesCount.RegionStatesCountBuilder().setOpenRegions(openRegionsCount) 2541 .setClosedRegions(closedRegionCount).setSplitRegions(splitRegionCount) 2542 .setRegionsInTransition(ritCount).setTotalRegions(totalRegionCount).build(); 2543 } 2544 2545}