001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.function.Function; 027import java.util.function.Supplier; 028import org.apache.commons.lang3.builder.ToStringBuilder; 029import org.apache.commons.lang3.builder.ToStringStyle; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.TableExistsException; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.TableNotFoundException; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 037import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 038import org.apache.hadoop.hbase.procedure2.LockAndQueue; 039import org.apache.hadoop.hbase.procedure2.LockStatus; 040import org.apache.hadoop.hbase.procedure2.LockedResource; 041import org.apache.hadoop.hbase.procedure2.LockedResourceType; 042import org.apache.hadoop.hbase.procedure2.Procedure; 043import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 044import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 045import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 046import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 047import org.apache.yetus.audience.InterfaceAudience; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the 053 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the 054 * master operations can be executed concurrently, if they are operating on different tables (e.g. 055 * two create table procedures can be performed at the same time) or against two different servers; 056 * say two servers that crashed at about the same time. 057 * <p> 058 * Each procedure should implement an Interface providing information for this queue. For example 059 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed 060 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can 061 * abort all the operations preceding a delete table, or similar. 062 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue, 063 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly 064 * includes:<br> 065 * <ul> 066 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when: 067 * <ol> 068 * <li>Queue was empty before push (so must have been out of run-queue)</li> 069 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have 070 * moved Queue out of run-queue)</li> 071 * </ol> 072 * </li> 073 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when: 074 * <ol> 075 * <li>Queue becomes empty after poll</li> 076 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 077 * procedure)</li> 078 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 079 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a 080 * child</li> 081 * </ol> 082 * </li> 083 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 084 * <ol> 085 * <li>Exclusive lock</li> 086 * <li>Last shared lock (in case queue was removed because next procedure in queue required 087 * exclusive lock)</li> 088 * </ol> 089 * </li> 090 * </ul> 091 */ 092@InterfaceAudience.Private 093public class MasterProcedureScheduler extends AbstractProcedureScheduler { 094 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 095 096 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 097 (n, k) -> n.compareKey((ServerName) k); 098 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 099 (n, k) -> n.compareKey((TableName) k); 100 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 101 (n, k) -> n.compareKey((String) k); 102 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 103 (n, k) -> n.compareKey((TableName) k); 104 private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR = 105 (n, k) -> n.compareKey((String) k); 106 107 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 108 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 109 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 110 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 111 private final FairQueue<String> globalRunQueue = new FairQueue<>(); 112 113 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 114 private TableQueue tableMap = null; 115 private PeerQueue peerMap = null; 116 private MetaQueue metaMap = null; 117 private GlobalQueue globalMap = null; 118 119 private final Function<Long, Procedure<?>> procedureRetriever; 120 private final SchemaLocking locking; 121 122 // To prevent multiple Create/Modify/Disable/Enable table procedure run at the same time, we will 123 // keep table procedure in this queue first before actually enqueuing it to tableQueue 124 // Seee HBASE-28683 for more details 125 private final Map<TableName, TableProcedureWaitingQueue> tableProcsWaitingEnqueue = 126 new HashMap<>(); 127 128 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 129 this.procedureRetriever = procedureRetriever; 130 locking = new SchemaLocking(procedureRetriever); 131 } 132 133 @Override 134 public void yield(final Procedure proc) { 135 push(proc, false, true); 136 } 137 138 private boolean shouldWaitBeforeEnqueuing(TableProcedureInterface proc) { 139 return TableQueue.requireTableExclusiveLock(proc); 140 } 141 142 @Override 143 protected void enqueue(final Procedure proc, final boolean addFront) { 144 if (isMetaProcedure(proc)) { 145 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 146 } else if (isTableProcedure(proc)) { 147 TableProcedureInterface tableProc = (TableProcedureInterface) proc; 148 if (shouldWaitBeforeEnqueuing(tableProc)) { 149 TableProcedureWaitingQueue waitingQueue = tableProcsWaitingEnqueue.computeIfAbsent( 150 tableProc.getTableName(), k -> new TableProcedureWaitingQueue(procedureRetriever)); 151 if (!waitingQueue.procedureSubmitted(proc)) { 152 // there is a table procedure for this table already enqueued, waiting 153 LOG.debug("There is already a procedure running for table {}, added {} to waiting queue", 154 tableProc.getTableName(), proc); 155 return; 156 } 157 } 158 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 159 } else if (isServerProcedure(proc)) { 160 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 161 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 162 } else if (isPeerProcedure(proc)) { 163 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 164 } else if (isGlobalProcedure(proc)) { 165 doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront); 166 } else { 167 // TODO: at the moment we only have Table and Server procedures 168 // if you are implementing a non-table/non-server procedure, you have two options: create 169 // a group for all the non-table/non-server procedures or try to find a key for your 170 // non-table/non-server procedures and implement something similar to the TableRunQueue. 171 throw new UnsupportedOperationException( 172 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 173 } 174 } 175 176 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 177 Procedure<?> proc, boolean addFront) { 178 queue.add(proc, addFront); 179 // For the following conditions, we will put the queue back into execution 180 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 181 // which means it can be executed immediately. 182 // 2. The exclusive lock for this queue has not been held. 183 // 3. The given procedure has the exclusive lock permission for this queue. 184 Supplier<String> reason = null; 185 if (proc.hasLock()) { 186 reason = () -> proc + " has lock"; 187 } else if (proc.isLockedWhenLoading()) { 188 reason = () -> proc + " restores lock when restarting"; 189 } else if (!queue.getLockStatus().hasExclusiveLock()) { 190 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 191 } else if (queue.getLockStatus().hasLockAccess(proc)) { 192 reason = () -> proc + " has the excusive lock access"; 193 } 194 if (reason != null) { 195 addToRunQueue(fairq, queue, reason); 196 } 197 } 198 199 @Override 200 protected boolean queueHasRunnables() { 201 return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables() 202 || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() 203 || peerRunQueue.hasRunnables(); 204 } 205 206 @Override 207 protected Procedure dequeue() { 208 // pull global first 209 Procedure<?> pollResult = doPoll(globalRunQueue); 210 // then meta procedure 211 if (pollResult == null) { 212 pollResult = doPoll(metaRunQueue); 213 } 214 // For now, let server handling have precedence over table handling; presumption is that it 215 // is more important handling crashed servers than it is running the 216 // enabling/disabling tables, etc. 217 if (pollResult == null) { 218 pollResult = doPoll(serverRunQueue); 219 } 220 if (pollResult == null) { 221 pollResult = doPoll(peerRunQueue); 222 } 223 if (pollResult == null) { 224 pollResult = doPoll(tableRunQueue); 225 } 226 return pollResult; 227 } 228 229 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 230 LockStatus s = rq.getLockStatus(); 231 // if we have the lock access, we are ready 232 if (s.hasLockAccess(proc)) { 233 return true; 234 } 235 boolean xlockReq = rq.requireExclusiveLock(proc); 236 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 237 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 238 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 239 } 240 241 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 242 Queue<T> rq = fairq.poll(); 243 if (rq == null || !rq.isAvailable()) { 244 return null; 245 } 246 // loop until we find out a procedure which is ready to run, or if we have checked all the 247 // procedures, then we give up and remove the queue from run queue. 248 for (int i = 0, n = rq.size(); i < n; i++) { 249 Procedure<?> proc = rq.poll(); 250 if (isLockReady(proc, rq)) { 251 // the queue is empty, remove from run queue 252 if (rq.isEmpty()) { 253 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 254 } 255 return proc; 256 } 257 // we are not ready to run, add back and try the next procedure 258 rq.add(proc, false); 259 } 260 // no procedure is ready for execution, remove from run queue 261 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 262 return null; 263 } 264 265 @Override 266 public List<LockedResource> getLocks() { 267 schedLock(); 268 try { 269 return locking.getLocks(); 270 } finally { 271 schedUnlock(); 272 } 273 } 274 275 @Override 276 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 277 schedLock(); 278 try { 279 return locking.getLockResource(resourceType, resourceName); 280 } finally { 281 schedUnlock(); 282 } 283 } 284 285 @Override 286 public void clear() { 287 schedLock(); 288 try { 289 clearQueue(); 290 locking.clear(); 291 } finally { 292 schedUnlock(); 293 } 294 } 295 296 private void clearQueue() { 297 // Remove Servers 298 for (int i = 0; i < serverBuckets.length; ++i) { 299 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 300 serverBuckets[i] = null; 301 } 302 303 // Remove Tables 304 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 305 tableMap = null; 306 tableProcsWaitingEnqueue.clear(); 307 308 // Remove Peers 309 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 310 peerMap = null; 311 312 // Remove Meta 313 clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR); 314 metaMap = null; 315 316 // Remove Global 317 clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR); 318 globalMap = null; 319 320 assert size() == 0 : "expected queue size to be 0, got " + size(); 321 } 322 323 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 324 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 325 while (treeMap != null) { 326 Queue<T> node = AvlTree.getFirst(treeMap); 327 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 328 if (fairq != null) { 329 removeFromRunQueue(fairq, node, () -> "clear all queues"); 330 } 331 } 332 } 333 334 private int queueSize(Queue<?> head) { 335 int count = 0; 336 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 337 while (iter.hasNext()) { 338 count += iter.next().size(); 339 } 340 return count; 341 } 342 343 @Override 344 protected int queueSize() { 345 int count = 0; 346 for (ServerQueue serverMap : serverBuckets) { 347 count += queueSize(serverMap); 348 } 349 count += queueSize(tableMap); 350 count += queueSize(peerMap); 351 count += queueSize(metaMap); 352 count += queueSize(globalMap); 353 for (TableProcedureWaitingQueue waitingQ : tableProcsWaitingEnqueue.values()) { 354 count += waitingQ.waitingSize(); 355 } 356 return count; 357 } 358 359 @Override 360 public void completionCleanup(final Procedure proc) { 361 if (isTableProcedure(proc)) { 362 TableProcedureInterface tableProc = (TableProcedureInterface) proc; 363 if (shouldWaitBeforeEnqueuing(tableProc)) { 364 schedLock(); 365 try { 366 TableProcedureWaitingQueue waitingQueue = 367 tableProcsWaitingEnqueue.get(tableProc.getTableName()); 368 if (waitingQueue != null) { 369 Optional<Procedure<?>> nextProc = waitingQueue.procedureCompleted(proc); 370 if (nextProc.isPresent()) { 371 // enqueue it 372 Procedure<?> next = nextProc.get(); 373 LOG.debug("{} completed, enqueue a new procedure {}", proc, next); 374 doAdd(tableRunQueue, getTableQueue(tableProc.getTableName()), next, false); 375 } else { 376 if (waitingQueue.isEmpty()) { 377 // there is no waiting procedures in it, remove 378 tableProcsWaitingEnqueue.remove(tableProc.getTableName()); 379 } 380 } 381 } else { 382 // this should not happen normally, warn it 383 LOG.warn("no waiting queue while completing {}, which should not happen", proc); 384 } 385 } finally { 386 schedUnlock(); 387 } 388 } 389 boolean tableDeleted; 390 if (proc.hasException()) { 391 Exception procEx = proc.getException().unwrapRemoteException(); 392 if (tableProc.getTableOperationType() == TableOperationType.CREATE) { 393 // create failed because the table already exist 394 tableDeleted = !(procEx instanceof TableExistsException); 395 } else { 396 // the operation failed because the table does not exist 397 tableDeleted = (procEx instanceof TableNotFoundException); 398 } 399 } else { 400 // the table was deleted 401 tableDeleted = (tableProc.getTableOperationType() == TableOperationType.DELETE); 402 } 403 if (tableDeleted) { 404 markTableAsDeleted(tableProc.getTableName(), proc); 405 } 406 } else if (proc instanceof PeerProcedureInterface) { 407 tryCleanupPeerQueue(getPeerId(proc), proc); 408 } else if (proc instanceof ServerProcedureInterface) { 409 tryCleanupServerQueue(getServerName(proc), proc); 410 } else { 411 // No cleanup for other procedure types, yet. 412 return; 413 } 414 } 415 416 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 417 Supplier<String> reason) { 418 if (LOG.isTraceEnabled()) { 419 LOG.trace("Add {} to run queue because: {}", queue, reason.get()); 420 } 421 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 422 fairq.add(queue); 423 } 424 } 425 426 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 427 Queue<T> queue, Supplier<String> reason) { 428 if (LOG.isTraceEnabled()) { 429 LOG.trace("Remove {} from run queue because: {}", queue, reason.get()); 430 } 431 if (AvlIterableList.isLinked(queue)) { 432 fairq.remove(queue); 433 } 434 } 435 436 // ============================================================================ 437 // Table Queue Lookup Helpers 438 // ============================================================================ 439 private TableQueue getTableQueue(TableName tableName) { 440 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 441 if (node != null) return node; 442 443 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 444 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 445 tableMap = AvlTree.insert(tableMap, node); 446 return node; 447 } 448 449 private void removeTableQueue(TableName tableName) { 450 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 451 locking.removeTableLock(tableName); 452 } 453 454 private static boolean isTableProcedure(Procedure<?> proc) { 455 return proc instanceof TableProcedureInterface; 456 } 457 458 private static TableName getTableName(Procedure<?> proc) { 459 return ((TableProcedureInterface) proc).getTableName(); 460 } 461 462 // ============================================================================ 463 // Server Queue Lookup Helpers 464 // ============================================================================ 465 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 466 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 467 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 468 if (node != null) { 469 return node; 470 } 471 int priority; 472 if (proc != null) { 473 priority = MasterProcedureUtil.getServerPriority(proc); 474 } else { 475 priority = 1; 476 } 477 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 478 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 479 return node; 480 } 481 482 private void removeServerQueue(ServerName serverName) { 483 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 484 serverBuckets[index] = 485 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 486 locking.removeServerLock(serverName); 487 } 488 489 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 490 schedLock(); 491 try { 492 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 493 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 494 if (node == null) { 495 return; 496 } 497 498 LockAndQueue lock = locking.getServerLock(serverName); 499 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 500 removeFromRunQueue(serverRunQueue, node, 501 () -> "clean up server queue after " + proc + " completed"); 502 removeServerQueue(serverName); 503 } 504 } finally { 505 schedUnlock(); 506 } 507 } 508 509 private static int getBucketIndex(Object[] buckets, int hashCode) { 510 return Math.abs(hashCode) % buckets.length; 511 } 512 513 private static boolean isServerProcedure(Procedure<?> proc) { 514 return proc instanceof ServerProcedureInterface; 515 } 516 517 private static ServerName getServerName(Procedure<?> proc) { 518 return ((ServerProcedureInterface) proc).getServerName(); 519 } 520 521 // ============================================================================ 522 // Peer Queue Lookup Helpers 523 // ============================================================================ 524 private PeerQueue getPeerQueue(String peerId) { 525 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 526 if (node != null) { 527 return node; 528 } 529 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 530 peerMap = AvlTree.insert(peerMap, node); 531 return node; 532 } 533 534 private void removePeerQueue(String peerId) { 535 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 536 locking.removePeerLock(peerId); 537 } 538 539 private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) { 540 schedLock(); 541 try { 542 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 543 if (queue == null) { 544 return; 545 } 546 547 final LockAndQueue lock = locking.getPeerLock(peerId); 548 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 549 removeFromRunQueue(peerRunQueue, queue, 550 () -> "clean up peer queue after " + procedure + " completed"); 551 removePeerQueue(peerId); 552 } 553 } finally { 554 schedUnlock(); 555 } 556 } 557 558 private static boolean isPeerProcedure(Procedure<?> proc) { 559 return proc instanceof PeerProcedureInterface; 560 } 561 562 private static String getPeerId(Procedure<?> proc) { 563 return ((PeerProcedureInterface) proc).getPeerId(); 564 } 565 566 // ============================================================================ 567 // Meta Queue Lookup Helpers 568 // ============================================================================ 569 private MetaQueue getMetaQueue() { 570 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 571 if (node != null) { 572 return node; 573 } 574 node = new MetaQueue(locking.getMetaLock()); 575 metaMap = AvlTree.insert(metaMap, node); 576 return node; 577 } 578 579 private static boolean isMetaProcedure(Procedure<?> proc) { 580 return proc instanceof MetaProcedureInterface; 581 } 582 583 // ============================================================================ 584 // Global Queue Lookup Helpers 585 // ============================================================================ 586 private GlobalQueue getGlobalQueue(String globalId) { 587 GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 588 if (node != null) { 589 return node; 590 } 591 node = new GlobalQueue(globalId, locking.getGlobalLock(globalId)); 592 globalMap = AvlTree.insert(globalMap, node); 593 return node; 594 } 595 596 private void removeGlobalQueue(String globalId) { 597 globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 598 locking.removeGlobalLock(globalId); 599 } 600 601 private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) { 602 schedLock(); 603 try { 604 GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR); 605 if (queue == null) { 606 return; 607 } 608 609 final LockAndQueue lock = locking.getGlobalLock(globalId); 610 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 611 removeFromRunQueue(globalRunQueue, queue, 612 () -> "clean up global queue after " + procedure + " completed"); 613 removeGlobalQueue(globalId); 614 } 615 } finally { 616 schedUnlock(); 617 } 618 } 619 620 private static boolean isGlobalProcedure(Procedure<?> proc) { 621 return proc instanceof GlobalProcedureInterface; 622 } 623 624 private static String getGlobalId(Procedure<?> proc) { 625 return ((GlobalProcedureInterface) proc).getGlobalId(); 626 } 627 628 // ============================================================================ 629 // Table Locking Helpers 630 // ============================================================================ 631 /** 632 * Get lock info for a resource of specified type and name and log details 633 */ 634 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 635 if (!LOG.isDebugEnabled()) { 636 return; 637 } 638 639 LockedResource lockedResource = getLockResource(resourceType, resourceName); 640 if (lockedResource != null) { 641 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" 642 + lockedResource.getSharedLockCount(); 643 644 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 645 if (proc != null) { 646 msg += ", exclusively locked by procId=" + proc.getProcId(); 647 } 648 LOG.debug(msg); 649 } 650 } 651 652 /** 653 * Suspend the procedure if the specified table is already locked. Other operations in the 654 * table-queue will be executed after the lock is released. 655 * @param procedure the procedure trying to acquire the lock 656 * @param table Table to lock 657 * @return true if the procedure has to wait for the table to be available 658 */ 659 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 660 schedLock(); 661 try { 662 final String namespace = table.getNamespaceAsString(); 663 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 664 final LockAndQueue tableLock = locking.getTableLock(table); 665 if (!namespaceLock.trySharedLock(procedure)) { 666 waitProcedure(namespaceLock, procedure); 667 logLockedResource(LockedResourceType.NAMESPACE, namespace); 668 return true; 669 } 670 if (!tableLock.tryExclusiveLock(procedure)) { 671 namespaceLock.releaseSharedLock(); 672 waitProcedure(tableLock, procedure); 673 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 674 return true; 675 } 676 removeFromRunQueue(tableRunQueue, getTableQueue(table), 677 () -> procedure + " held the exclusive lock"); 678 return false; 679 } finally { 680 schedUnlock(); 681 } 682 } 683 684 /** 685 * Wake the procedures waiting for the specified table 686 * @param procedure the procedure releasing the lock 687 * @param table the name of the table that has the exclusive lock 688 */ 689 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 690 schedLock(); 691 try { 692 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 693 final LockAndQueue tableLock = locking.getTableLock(table); 694 int waitingCount = 0; 695 if (tableLock.releaseExclusiveLock(procedure)) { 696 waitingCount += wakeWaitingProcedures(tableLock); 697 } 698 if (namespaceLock.releaseSharedLock()) { 699 waitingCount += wakeWaitingProcedures(namespaceLock); 700 } 701 addToRunQueue(tableRunQueue, getTableQueue(table), 702 () -> procedure + " released the exclusive lock"); 703 wakePollIfNeeded(waitingCount); 704 } finally { 705 schedUnlock(); 706 } 707 } 708 709 /** 710 * Suspend the procedure if the specified table is already locked. other "read" operations in the 711 * table-queue may be executed concurrently, 712 * @param procedure the procedure trying to acquire the lock 713 * @param table Table to lock 714 * @return true if the procedure has to wait for the table to be available 715 */ 716 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 717 return waitTableQueueSharedLock(procedure, table) == null; 718 } 719 720 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 721 schedLock(); 722 try { 723 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 724 final LockAndQueue tableLock = locking.getTableLock(table); 725 if (!namespaceLock.trySharedLock(procedure)) { 726 waitProcedure(namespaceLock, procedure); 727 return null; 728 } 729 730 if (!tableLock.trySharedLock(procedure)) { 731 namespaceLock.releaseSharedLock(); 732 waitProcedure(tableLock, procedure); 733 return null; 734 } 735 736 return getTableQueue(table); 737 } finally { 738 schedUnlock(); 739 } 740 } 741 742 /** 743 * Wake the procedures waiting for the specified table 744 * @param procedure the procedure releasing the lock 745 * @param table the name of the table that has the shared lock 746 */ 747 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 748 schedLock(); 749 try { 750 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 751 final LockAndQueue tableLock = locking.getTableLock(table); 752 int waitingCount = 0; 753 if (tableLock.releaseSharedLock()) { 754 addToRunQueue(tableRunQueue, getTableQueue(table), 755 () -> procedure + " released the shared lock"); 756 waitingCount += wakeWaitingProcedures(tableLock); 757 } 758 if (namespaceLock.releaseSharedLock()) { 759 waitingCount += wakeWaitingProcedures(namespaceLock); 760 } 761 wakePollIfNeeded(waitingCount); 762 } finally { 763 schedUnlock(); 764 } 765 } 766 767 /** 768 * Tries to remove the queue and the table-lock of the specified table. If there are new 769 * operations pending (e.g. a new create), the remove will not be performed. 770 * @param table the name of the table that should be marked as deleted 771 * @param procedure the procedure that is removing the table 772 * @return true if deletion succeeded, false otherwise meaning that there are other new operations 773 * pending for that table (e.g. a new create). 774 */ 775 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 776 schedLock(); 777 try { 778 final TableQueue queue = getTableQueue(table); 779 final LockAndQueue tableLock = locking.getTableLock(table); 780 if (queue == null) { 781 return true; 782 } 783 784 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 785 // remove the table from the run-queue and the map 786 if (AvlIterableList.isLinked(queue)) { 787 tableRunQueue.remove(queue); 788 } 789 removeTableQueue(table); 790 } else { 791 // TODO: If there are no create, we can drop all the other ops 792 return false; 793 } 794 } finally { 795 schedUnlock(); 796 } 797 return true; 798 } 799 800 // ============================================================================ 801 // Region Locking Helpers 802 // ============================================================================ 803 /** 804 * Suspend the procedure if the specified region is already locked. 805 * @param procedure the procedure trying to acquire the lock on the region 806 * @param regionInfo the region we are trying to lock 807 * @return true if the procedure has to wait for the regions to be available 808 */ 809 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 810 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 811 } 812 813 /** 814 * Suspend the procedure if the specified set of regions are already locked. 815 * @param procedure the procedure trying to acquire the lock on the regions 816 * @param table the table name of the regions we are trying to lock 817 * @param regionInfos the list of regions we are trying to lock 818 * @return true if the procedure has to wait for the regions to be available 819 */ 820 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 821 final RegionInfo... regionInfos) { 822 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 823 schedLock(); 824 try { 825 assert table != null; 826 if (waitTableSharedLock(procedure, table)) { 827 return true; 828 } 829 830 // acquire region xlocks or wait 831 boolean hasLock = true; 832 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length]; 833 for (int i = 0; i < regionInfos.length; ++i) { 834 assert regionInfos[i] != null; 835 assert regionInfos[i].getTable() != null; 836 assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure; 837 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 838 : "duplicate region: " + regionInfos[i]; 839 840 regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName()); 841 if (!regionLocks[i].tryExclusiveLock(procedure)) { 842 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 843 regionLocks[i].getExclusiveLockProcIdOwner()); 844 waitProcedure(regionLocks[i], procedure); 845 hasLock = false; 846 while (i-- > 0) { 847 regionLocks[i].releaseExclusiveLock(procedure); 848 } 849 break; 850 } else { 851 LOG.info("Took xlock for {}", procedure); 852 } 853 } 854 855 if (!hasLock) { 856 wakeTableSharedLock(procedure, table); 857 } 858 return !hasLock; 859 } finally { 860 schedUnlock(); 861 } 862 } 863 864 /** 865 * Wake the procedures waiting for the specified region 866 * @param procedure the procedure that was holding the region 867 * @param regionInfo the region the procedure was holding 868 */ 869 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 870 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 871 } 872 873 /** 874 * Wake the procedures waiting for the specified regions 875 * @param procedure the procedure that was holding the regions 876 * @param regionInfos the list of regions the procedure was holding 877 */ 878 public void wakeRegions(final Procedure<?> procedure, final TableName table, 879 final RegionInfo... regionInfos) { 880 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 881 schedLock(); 882 try { 883 int numProcs = 0; 884 final Procedure<?>[] nextProcs = new Procedure[regionInfos.length]; 885 for (int i = 0; i < regionInfos.length; ++i) { 886 assert regionInfos[i].getTable().equals(table); 887 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 888 : "duplicate region: " + regionInfos[i]; 889 890 LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName()); 891 if (regionLock.releaseExclusiveLock(procedure)) { 892 if (!regionLock.isWaitingQueueEmpty()) { 893 // release one procedure at the time since regions has an xlock 894 nextProcs[numProcs++] = regionLock.removeFirst(); 895 } else { 896 locking.removeRegionLock(regionInfos[i].getEncodedName()); 897 } 898 } 899 } 900 901 // awake procedures if any 902 for (int i = numProcs - 1; i >= 0; --i) { 903 wakeProcedure(nextProcs[i]); 904 } 905 wakePollIfNeeded(numProcs); 906 // release the table shared-lock. 907 wakeTableSharedLock(procedure, table); 908 } finally { 909 schedUnlock(); 910 } 911 } 912 913 // ============================================================================ 914 // Namespace Locking Helpers 915 // ============================================================================ 916 /** 917 * Suspend the procedure if the specified namespace is already locked. 918 * @see #wakeNamespaceExclusiveLock(Procedure,String) 919 * @param procedure the procedure trying to acquire the lock 920 * @param namespace Namespace to lock 921 * @return true if the procedure has to wait for the namespace to be available 922 */ 923 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 924 schedLock(); 925 try { 926 final LockAndQueue systemNamespaceTableLock = 927 locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); 928 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 929 waitProcedure(systemNamespaceTableLock, procedure); 930 logLockedResource(LockedResourceType.TABLE, 931 TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString()); 932 return true; 933 } 934 935 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 936 if (!namespaceLock.tryExclusiveLock(procedure)) { 937 systemNamespaceTableLock.releaseSharedLock(); 938 waitProcedure(namespaceLock, procedure); 939 logLockedResource(LockedResourceType.NAMESPACE, namespace); 940 return true; 941 } 942 return false; 943 } finally { 944 schedUnlock(); 945 } 946 } 947 948 /** 949 * Wake the procedures waiting for the specified namespace 950 * @see #waitNamespaceExclusiveLock(Procedure,String) 951 * @param procedure the procedure releasing the lock 952 * @param namespace the namespace that has the exclusive lock 953 */ 954 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 955 schedLock(); 956 try { 957 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 958 final LockAndQueue systemNamespaceTableLock = 959 locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME); 960 int waitingCount = 0; 961 if (namespaceLock.releaseExclusiveLock(procedure)) { 962 waitingCount += wakeWaitingProcedures(namespaceLock); 963 } 964 if (systemNamespaceTableLock.releaseSharedLock()) { 965 addToRunQueue(tableRunQueue, 966 getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME), 967 () -> procedure + " released namespace exclusive lock"); 968 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 969 } 970 wakePollIfNeeded(waitingCount); 971 } finally { 972 schedUnlock(); 973 } 974 } 975 976 // ============================================================================ 977 // Server Locking Helpers 978 // ============================================================================ 979 /** 980 * Try to acquire the exclusive lock on the specified server. 981 * @see #wakeServerExclusiveLock(Procedure,ServerName) 982 * @param procedure the procedure trying to acquire the lock 983 * @param serverName Server to lock 984 * @return true if the procedure has to wait for the server to be available 985 */ 986 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 987 final ServerName serverName) { 988 schedLock(); 989 try { 990 final LockAndQueue lock = locking.getServerLock(serverName); 991 if (lock.tryExclusiveLock(procedure)) { 992 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 993 // so. 994 removeFromRunQueue(serverRunQueue, 995 getServerQueue(serverName, 996 procedure instanceof ServerProcedureInterface 997 ? (ServerProcedureInterface) procedure 998 : null), 999 () -> procedure + " held exclusive lock"); 1000 return false; 1001 } 1002 waitProcedure(lock, procedure); 1003 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 1004 return true; 1005 } finally { 1006 schedUnlock(); 1007 } 1008 } 1009 1010 /** 1011 * Wake the procedures waiting for the specified server 1012 * @see #waitServerExclusiveLock(Procedure,ServerName) 1013 * @param procedure the procedure releasing the lock 1014 * @param serverName the server that has the exclusive lock 1015 */ 1016 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 1017 schedLock(); 1018 try { 1019 final LockAndQueue lock = locking.getServerLock(serverName); 1020 // Only SCP will acquire/release server lock so do not need to check the return value here. 1021 lock.releaseExclusiveLock(procedure); 1022 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 1023 // so. 1024 addToRunQueue(serverRunQueue, 1025 getServerQueue(serverName, 1026 procedure instanceof ServerProcedureInterface 1027 ? (ServerProcedureInterface) procedure 1028 : null), 1029 () -> procedure + " released exclusive lock"); 1030 int waitingCount = wakeWaitingProcedures(lock); 1031 wakePollIfNeeded(waitingCount); 1032 } finally { 1033 schedUnlock(); 1034 } 1035 } 1036 1037 // ============================================================================ 1038 // Peer Locking Helpers 1039 // ============================================================================ 1040 /** 1041 * Try to acquire the exclusive lock on the specified peer. 1042 * @see #wakePeerExclusiveLock(Procedure, String) 1043 * @param procedure the procedure trying to acquire the lock 1044 * @param peerId peer to lock 1045 * @return true if the procedure has to wait for the peer to be available 1046 */ 1047 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 1048 schedLock(); 1049 try { 1050 final LockAndQueue lock = locking.getPeerLock(peerId); 1051 if (lock.tryExclusiveLock(procedure)) { 1052 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 1053 () -> procedure + " held exclusive lock"); 1054 return false; 1055 } 1056 waitProcedure(lock, procedure); 1057 logLockedResource(LockedResourceType.PEER, peerId); 1058 return true; 1059 } finally { 1060 schedUnlock(); 1061 } 1062 } 1063 1064 /** 1065 * Wake the procedures waiting for the specified peer 1066 * @see #waitPeerExclusiveLock(Procedure, String) 1067 * @param procedure the procedure releasing the lock 1068 * @param peerId the peer that has the exclusive lock 1069 */ 1070 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 1071 schedLock(); 1072 try { 1073 final LockAndQueue lock = locking.getPeerLock(peerId); 1074 if (lock.releaseExclusiveLock(procedure)) { 1075 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 1076 () -> procedure + " released exclusive lock"); 1077 int waitingCount = wakeWaitingProcedures(lock); 1078 wakePollIfNeeded(waitingCount); 1079 } 1080 } finally { 1081 schedUnlock(); 1082 } 1083 } 1084 1085 // ============================================================================ 1086 // Meta Locking Helpers 1087 // ============================================================================ 1088 /** 1089 * Try to acquire the exclusive lock on meta. 1090 * @see #wakeMetaExclusiveLock(Procedure) 1091 * @param procedure the procedure trying to acquire the lock 1092 * @return true if the procedure has to wait for meta to be available 1093 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1094 * {@link RecoverMetaProcedure}. 1095 */ 1096 @Deprecated 1097 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 1098 schedLock(); 1099 try { 1100 final LockAndQueue lock = locking.getMetaLock(); 1101 if (lock.tryExclusiveLock(procedure)) { 1102 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 1103 return false; 1104 } 1105 waitProcedure(lock, procedure); 1106 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 1107 return true; 1108 } finally { 1109 schedUnlock(); 1110 } 1111 } 1112 1113 /** 1114 * Wake the procedures waiting for meta. 1115 * @see #waitMetaExclusiveLock(Procedure) 1116 * @param procedure the procedure releasing the lock 1117 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 1118 * {@link RecoverMetaProcedure}. 1119 */ 1120 @Deprecated 1121 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1122 schedLock(); 1123 try { 1124 final LockAndQueue lock = locking.getMetaLock(); 1125 lock.releaseExclusiveLock(procedure); 1126 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1127 int waitingCount = wakeWaitingProcedures(lock); 1128 wakePollIfNeeded(waitingCount); 1129 } finally { 1130 schedUnlock(); 1131 } 1132 } 1133 1134 // ============================================================================ 1135 // Global Locking Helpers 1136 // ============================================================================ 1137 /** 1138 * Try to acquire the share lock on global. 1139 * @see #wakeGlobalExclusiveLock(Procedure, String) 1140 * @param procedure the procedure trying to acquire the lock 1141 * @return true if the procedure has to wait for global to be available 1142 */ 1143 public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) { 1144 schedLock(); 1145 try { 1146 final LockAndQueue lock = locking.getGlobalLock(globalId); 1147 if (lock.tryExclusiveLock(procedure)) { 1148 removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId), 1149 () -> procedure + " held shared lock"); 1150 return false; 1151 } 1152 waitProcedure(lock, procedure); 1153 logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING); 1154 return true; 1155 } finally { 1156 schedUnlock(); 1157 } 1158 } 1159 1160 /** 1161 * Wake the procedures waiting for global. 1162 * @see #waitGlobalExclusiveLock(Procedure, String) 1163 * @param procedure the procedure releasing the lock 1164 */ 1165 public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) { 1166 schedLock(); 1167 try { 1168 final LockAndQueue lock = locking.getGlobalLock(globalId); 1169 lock.releaseExclusiveLock(procedure); 1170 addToRunQueue(globalRunQueue, getGlobalQueue(globalId), 1171 () -> procedure + " released shared lock"); 1172 int waitingCount = wakeWaitingProcedures(lock); 1173 wakePollIfNeeded(waitingCount); 1174 } finally { 1175 schedUnlock(); 1176 } 1177 } 1178 1179 /** 1180 * For debugging. Expensive. 1181 */ 1182 public String dumpLocks() throws IOException { 1183 schedLock(); 1184 try { 1185 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1186 return this.locking.toString(); 1187 } finally { 1188 schedUnlock(); 1189 } 1190 } 1191 1192 private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) { 1193 int size = queueSize(queue); 1194 if (size != 0) { 1195 builder.append(queueName, queue); 1196 } 1197 } 1198 1199 @Override 1200 public String toString() { 1201 ToStringBuilder builder = 1202 new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString()); 1203 schedLock(); 1204 try { 1205 for (int i = 0; i < serverBuckets.length; i++) { 1206 serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]); 1207 } 1208 builder.append("tableMap", tableMap); 1209 builder.append("tableWaitingMap", tableProcsWaitingEnqueue); 1210 builder.append("peerMap", peerMap); 1211 builder.append("metaMap", metaMap); 1212 builder.append("globalMap", globalMap); 1213 } finally { 1214 schedUnlock(); 1215 } 1216 return builder.build(); 1217 } 1218}