001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.function.Function; 024import java.util.function.Supplier; 025import org.apache.commons.lang3.builder.ToStringBuilder; 026import org.apache.commons.lang3.builder.ToStringStyle; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.TableExistsException; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.TableNotFoundException; 031import org.apache.hadoop.hbase.client.RegionInfo; 032import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; 033import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; 034import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; 035import org.apache.hadoop.hbase.procedure2.LockAndQueue; 036import org.apache.hadoop.hbase.procedure2.LockStatus; 037import org.apache.hadoop.hbase.procedure2.LockedResource; 038import org.apache.hadoop.hbase.procedure2.LockedResourceType; 039import org.apache.hadoop.hbase.procedure2.Procedure; 040import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; 041import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; 042import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; 043import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the 050 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the 051 * master operations can be executed concurrently, if they are operating on different tables (e.g. 052 * two create table procedures can be performed at the same time) or against two different servers; 053 * say two servers that crashed at about the same time. 054 * <p> 055 * Each procedure should implement an Interface providing information for this queue. For example 056 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed 057 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can 058 * abort all the operations preceding a delete table, or similar. 059 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue, 060 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly 061 * includes:<br> 062 * <ul> 063 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when: 064 * <ol> 065 * <li>Queue was empty before push (so must have been out of run-queue)</li> 066 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have 067 * moved Queue out of run-queue)</li> 068 * </ol> 069 * </li> 070 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when: 071 * <ol> 072 * <li>Queue becomes empty after poll</li> 073 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the 074 * procedure)</li> 075 * <li>Exclusive lock is requested but lock is not available (returns null)</li> 076 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a 077 * child</li> 078 * </ol> 079 * </li> 080 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is: 081 * <ol> 082 * <li>Exclusive lock</li> 083 * <li>Last shared lock (in case queue was removed because next procedure in queue required 084 * exclusive lock)</li> 085 * </ol> 086 * </li> 087 * </ul> 088 */ 089@InterfaceAudience.Private 090public class MasterProcedureScheduler extends AbstractProcedureScheduler { 091 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); 092 093 private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR = 094 (n, k) -> n.compareKey((ServerName) k); 095 private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR = 096 (n, k) -> n.compareKey((TableName) k); 097 private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR = 098 (n, k) -> n.compareKey((String) k); 099 private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR = 100 (n, k) -> n.compareKey((TableName) k); 101 102 private final FairQueue<ServerName> serverRunQueue = new FairQueue<>(); 103 private final FairQueue<TableName> tableRunQueue = new FairQueue<>(); 104 private final FairQueue<String> peerRunQueue = new FairQueue<>(); 105 private final FairQueue<TableName> metaRunQueue = new FairQueue<>(); 106 107 private final ServerQueue[] serverBuckets = new ServerQueue[128]; 108 private TableQueue tableMap = null; 109 private PeerQueue peerMap = null; 110 private MetaQueue metaMap = null; 111 112 private final SchemaLocking locking; 113 114 public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) { 115 locking = new SchemaLocking(procedureRetriever); 116 } 117 118 @Override 119 public void yield(final Procedure proc) { 120 push(proc, false, true); 121 } 122 123 @Override 124 protected void enqueue(final Procedure proc, final boolean addFront) { 125 if (isMetaProcedure(proc)) { 126 doAdd(metaRunQueue, getMetaQueue(), proc, addFront); 127 } else if (isTableProcedure(proc)) { 128 doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); 129 } else if (isServerProcedure(proc)) { 130 ServerProcedureInterface spi = (ServerProcedureInterface) proc; 131 doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); 132 } else if (isPeerProcedure(proc)) { 133 doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); 134 } else { 135 // TODO: at the moment we only have Table and Server procedures 136 // if you are implementing a non-table/non-server procedure, you have two options: create 137 // a group for all the non-table/non-server procedures or try to find a key for your 138 // non-table/non-server procedures and implement something similar to the TableRunQueue. 139 throw new UnsupportedOperationException( 140 "RQs for non-table/non-server procedures are not implemented yet: " + proc); 141 } 142 } 143 144 private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue, 145 Procedure<?> proc, boolean addFront) { 146 queue.add(proc, addFront); 147 // For the following conditions, we will put the queue back into execution 148 // 1. The procedure has already held the lock, or the lock has been restored when restarting, 149 // which means it can be executed immediately. 150 // 2. The exclusive lock for this queue has not been held. 151 // 3. The given procedure has the exclusive lock permission for this queue. 152 Supplier<String> reason = null; 153 if (proc.hasLock()) { 154 reason = () -> proc + " has lock"; 155 } else if (proc.isLockedWhenLoading()) { 156 reason = () -> proc + " restores lock when restarting"; 157 } else if (!queue.getLockStatus().hasExclusiveLock()) { 158 reason = () -> "the exclusive lock is not held by anyone when adding " + proc; 159 } else if (queue.getLockStatus().hasLockAccess(proc)) { 160 reason = () -> proc + " has the excusive lock access"; 161 } 162 if (reason != null) { 163 addToRunQueue(fairq, queue, reason); 164 } 165 } 166 167 @Override 168 protected boolean queueHasRunnables() { 169 return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() 170 || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); 171 } 172 173 @Override 174 protected Procedure dequeue() { 175 // meta procedure is always the first priority 176 Procedure<?> pollResult = doPoll(metaRunQueue); 177 // For now, let server handling have precedence over table handling; presumption is that it 178 // is more important handling crashed servers than it is running the 179 // enabling/disabling tables, etc. 180 if (pollResult == null) { 181 pollResult = doPoll(serverRunQueue); 182 } 183 if (pollResult == null) { 184 pollResult = doPoll(peerRunQueue); 185 } 186 if (pollResult == null) { 187 pollResult = doPoll(tableRunQueue); 188 } 189 return pollResult; 190 } 191 192 private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) { 193 LockStatus s = rq.getLockStatus(); 194 // if we have the lock access, we are ready 195 if (s.hasLockAccess(proc)) { 196 return true; 197 } 198 boolean xlockReq = rq.requireExclusiveLock(proc); 199 // if we need to hold the xlock, then we need to make sure that no one holds any lock, including 200 // the shared lock, otherwise, we just need to make sure that no one holds the xlock 201 return xlockReq ? !s.isLocked() : !s.hasExclusiveLock(); 202 } 203 204 private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) { 205 Queue<T> rq = fairq.poll(); 206 if (rq == null || !rq.isAvailable()) { 207 return null; 208 } 209 // loop until we find out a procedure which is ready to run, or if we have checked all the 210 // procedures, then we give up and remove the queue from run queue. 211 for (int i = 0, n = rq.size(); i < n; i++) { 212 Procedure<?> proc = rq.poll(); 213 if (isLockReady(proc, rq)) { 214 // the queue is empty, remove from run queue 215 if (rq.isEmpty()) { 216 removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc); 217 } 218 return proc; 219 } 220 // we are not ready to run, add back and try the next procedure 221 rq.add(proc, false); 222 } 223 // no procedure is ready for execution, remove from run queue 224 removeFromRunQueue(fairq, rq, () -> "no procedure can be executed"); 225 return null; 226 } 227 228 @Override 229 public List<LockedResource> getLocks() { 230 schedLock(); 231 try { 232 return locking.getLocks(); 233 } finally { 234 schedUnlock(); 235 } 236 } 237 238 @Override 239 public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { 240 schedLock(); 241 try { 242 return locking.getLockResource(resourceType, resourceName); 243 } finally { 244 schedUnlock(); 245 } 246 } 247 248 @Override 249 public void clear() { 250 schedLock(); 251 try { 252 clearQueue(); 253 locking.clear(); 254 } finally { 255 schedUnlock(); 256 } 257 } 258 259 private void clearQueue() { 260 // Remove Servers 261 for (int i = 0; i < serverBuckets.length; ++i) { 262 clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); 263 serverBuckets[i] = null; 264 } 265 266 // Remove Tables 267 clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); 268 tableMap = null; 269 270 // Remove Peers 271 clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); 272 peerMap = null; 273 274 assert size() == 0 : "expected queue size to be 0, got " + size(); 275 } 276 277 private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap, 278 FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) { 279 while (treeMap != null) { 280 Queue<T> node = AvlTree.getFirst(treeMap); 281 treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); 282 if (fairq != null) { 283 removeFromRunQueue(fairq, node, () -> "clear all queues"); 284 } 285 } 286 } 287 288 private int queueSize(Queue<?> head) { 289 int count = 0; 290 AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head); 291 while (iter.hasNext()) { 292 count += iter.next().size(); 293 } 294 return count; 295 } 296 297 @Override 298 protected int queueSize() { 299 int count = 0; 300 for (ServerQueue serverMap : serverBuckets) { 301 count += queueSize(serverMap); 302 } 303 count += queueSize(tableMap); 304 count += queueSize(peerMap); 305 count += queueSize(metaMap); 306 return count; 307 } 308 309 @Override 310 public void completionCleanup(final Procedure proc) { 311 if (proc instanceof TableProcedureInterface) { 312 TableProcedureInterface iProcTable = (TableProcedureInterface) proc; 313 boolean tableDeleted; 314 if (proc.hasException()) { 315 Exception procEx = proc.getException().unwrapRemoteException(); 316 if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { 317 // create failed because the table already exist 318 tableDeleted = !(procEx instanceof TableExistsException); 319 } else { 320 // the operation failed because the table does not exist 321 tableDeleted = (procEx instanceof TableNotFoundException); 322 } 323 } else { 324 // the table was deleted 325 tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); 326 } 327 if (tableDeleted) { 328 markTableAsDeleted(iProcTable.getTableName(), proc); 329 return; 330 } 331 } else if (proc instanceof PeerProcedureInterface) { 332 tryCleanupPeerQueue(getPeerId(proc), proc); 333 } else if (proc instanceof ServerProcedureInterface) { 334 tryCleanupServerQueue(getServerName(proc), proc); 335 } else { 336 // No cleanup for other procedure types, yet. 337 return; 338 } 339 } 340 341 private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue, 342 Supplier<String> reason) { 343 if (LOG.isTraceEnabled()) { 344 LOG.trace("Add {} to run queue because: {}", queue, reason.get()); 345 } 346 if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) { 347 fairq.add(queue); 348 } 349 } 350 351 private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, 352 Queue<T> queue, Supplier<String> reason) { 353 if (LOG.isTraceEnabled()) { 354 LOG.trace("Remove {} from run queue because: {}", queue, reason.get()); 355 } 356 if (AvlIterableList.isLinked(queue)) { 357 fairq.remove(queue); 358 } 359 } 360 361 // ============================================================================ 362 // Table Queue Lookup Helpers 363 // ============================================================================ 364 private TableQueue getTableQueue(TableName tableName) { 365 TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 366 if (node != null) return node; 367 368 node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), 369 locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); 370 tableMap = AvlTree.insert(tableMap, node); 371 return node; 372 } 373 374 private void removeTableQueue(TableName tableName) { 375 tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); 376 locking.removeTableLock(tableName); 377 } 378 379 private static boolean isTableProcedure(Procedure<?> proc) { 380 return proc instanceof TableProcedureInterface; 381 } 382 383 private static TableName getTableName(Procedure<?> proc) { 384 return ((TableProcedureInterface) proc).getTableName(); 385 } 386 387 // ============================================================================ 388 // Server Queue Lookup Helpers 389 // ============================================================================ 390 private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) { 391 final int index = getBucketIndex(serverBuckets, serverName.hashCode()); 392 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 393 if (node != null) { 394 return node; 395 } 396 int priority; 397 if (proc != null) { 398 priority = MasterProcedureUtil.getServerPriority(proc); 399 } else { 400 priority = 1; 401 } 402 node = new ServerQueue(serverName, priority, locking.getServerLock(serverName)); 403 serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); 404 return node; 405 } 406 407 private void removeServerQueue(ServerName serverName) { 408 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 409 serverBuckets[index] = 410 AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 411 locking.removeServerLock(serverName); 412 } 413 414 private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) { 415 schedLock(); 416 try { 417 int index = getBucketIndex(serverBuckets, serverName.hashCode()); 418 ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); 419 if (node == null) { 420 return; 421 } 422 423 LockAndQueue lock = locking.getServerLock(serverName); 424 if (node.isEmpty() && lock.tryExclusiveLock(proc)) { 425 removeFromRunQueue(serverRunQueue, node, 426 () -> "clean up server queue after " + proc + " completed"); 427 removeServerQueue(serverName); 428 } 429 } finally { 430 schedUnlock(); 431 } 432 } 433 434 private static int getBucketIndex(Object[] buckets, int hashCode) { 435 return Math.abs(hashCode) % buckets.length; 436 } 437 438 private static boolean isServerProcedure(Procedure<?> proc) { 439 return proc instanceof ServerProcedureInterface; 440 } 441 442 private static ServerName getServerName(Procedure<?> proc) { 443 return ((ServerProcedureInterface) proc).getServerName(); 444 } 445 446 // ============================================================================ 447 // Peer Queue Lookup Helpers 448 // ============================================================================ 449 private PeerQueue getPeerQueue(String peerId) { 450 PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 451 if (node != null) { 452 return node; 453 } 454 node = new PeerQueue(peerId, locking.getPeerLock(peerId)); 455 peerMap = AvlTree.insert(peerMap, node); 456 return node; 457 } 458 459 private void removePeerQueue(String peerId) { 460 peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 461 locking.removePeerLock(peerId); 462 } 463 464 private void tryCleanupPeerQueue(String peerId, Procedure procedure) { 465 schedLock(); 466 try { 467 PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); 468 if (queue == null) { 469 return; 470 } 471 472 final LockAndQueue lock = locking.getPeerLock(peerId); 473 if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) { 474 removeFromRunQueue(peerRunQueue, queue, 475 () -> "clean up peer queue after " + procedure + " completed"); 476 removePeerQueue(peerId); 477 } 478 } finally { 479 schedUnlock(); 480 } 481 } 482 483 private static boolean isPeerProcedure(Procedure<?> proc) { 484 return proc instanceof PeerProcedureInterface; 485 } 486 487 private static String getPeerId(Procedure<?> proc) { 488 return ((PeerProcedureInterface) proc).getPeerId(); 489 } 490 491 // ============================================================================ 492 // Meta Queue Lookup Helpers 493 // ============================================================================ 494 private MetaQueue getMetaQueue() { 495 MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR); 496 if (node != null) { 497 return node; 498 } 499 node = new MetaQueue(locking.getMetaLock()); 500 metaMap = AvlTree.insert(metaMap, node); 501 return node; 502 } 503 504 private static boolean isMetaProcedure(Procedure<?> proc) { 505 return proc instanceof MetaProcedureInterface; 506 } 507 508 // ============================================================================ 509 // Table Locking Helpers 510 // ============================================================================ 511 /** 512 * Get lock info for a resource of specified type and name and log details 513 */ 514 private void logLockedResource(LockedResourceType resourceType, String resourceName) { 515 if (!LOG.isDebugEnabled()) { 516 return; 517 } 518 519 LockedResource lockedResource = getLockResource(resourceType, resourceName); 520 if (lockedResource != null) { 521 String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" 522 + lockedResource.getSharedLockCount(); 523 524 Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure(); 525 if (proc != null) { 526 msg += ", exclusively locked by procId=" + proc.getProcId(); 527 } 528 LOG.debug(msg); 529 } 530 } 531 532 /** 533 * Suspend the procedure if the specified table is already locked. Other operations in the 534 * table-queue will be executed after the lock is released. 535 * @param procedure the procedure trying to acquire the lock 536 * @param table Table to lock 537 * @return true if the procedure has to wait for the table to be available 538 */ 539 public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 540 schedLock(); 541 try { 542 final String namespace = table.getNamespaceAsString(); 543 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 544 final LockAndQueue tableLock = locking.getTableLock(table); 545 if (!namespaceLock.trySharedLock(procedure)) { 546 waitProcedure(namespaceLock, procedure); 547 logLockedResource(LockedResourceType.NAMESPACE, namespace); 548 return true; 549 } 550 if (!tableLock.tryExclusiveLock(procedure)) { 551 namespaceLock.releaseSharedLock(); 552 waitProcedure(tableLock, procedure); 553 logLockedResource(LockedResourceType.TABLE, table.getNameAsString()); 554 return true; 555 } 556 removeFromRunQueue(tableRunQueue, getTableQueue(table), 557 () -> procedure + " held the exclusive lock"); 558 return false; 559 } finally { 560 schedUnlock(); 561 } 562 } 563 564 /** 565 * Wake the procedures waiting for the specified table 566 * @param procedure the procedure releasing the lock 567 * @param table the name of the table that has the exclusive lock 568 */ 569 public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) { 570 schedLock(); 571 try { 572 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 573 final LockAndQueue tableLock = locking.getTableLock(table); 574 int waitingCount = 0; 575 if (tableLock.releaseExclusiveLock(procedure)) { 576 waitingCount += wakeWaitingProcedures(tableLock); 577 } 578 if (namespaceLock.releaseSharedLock()) { 579 waitingCount += wakeWaitingProcedures(namespaceLock); 580 } 581 addToRunQueue(tableRunQueue, getTableQueue(table), 582 () -> procedure + " released the exclusive lock"); 583 wakePollIfNeeded(waitingCount); 584 } finally { 585 schedUnlock(); 586 } 587 } 588 589 /** 590 * Suspend the procedure if the specified table is already locked. other "read" operations in the 591 * table-queue may be executed concurrently, 592 * @param procedure the procedure trying to acquire the lock 593 * @param table Table to lock 594 * @return true if the procedure has to wait for the table to be available 595 */ 596 public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) { 597 return waitTableQueueSharedLock(procedure, table) == null; 598 } 599 600 private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) { 601 schedLock(); 602 try { 603 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 604 final LockAndQueue tableLock = locking.getTableLock(table); 605 if (!namespaceLock.trySharedLock(procedure)) { 606 waitProcedure(namespaceLock, procedure); 607 return null; 608 } 609 610 if (!tableLock.trySharedLock(procedure)) { 611 namespaceLock.releaseSharedLock(); 612 waitProcedure(tableLock, procedure); 613 return null; 614 } 615 616 return getTableQueue(table); 617 } finally { 618 schedUnlock(); 619 } 620 } 621 622 /** 623 * Wake the procedures waiting for the specified table 624 * @param procedure the procedure releasing the lock 625 * @param table the name of the table that has the shared lock 626 */ 627 public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) { 628 schedLock(); 629 try { 630 final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); 631 final LockAndQueue tableLock = locking.getTableLock(table); 632 int waitingCount = 0; 633 if (tableLock.releaseSharedLock()) { 634 addToRunQueue(tableRunQueue, getTableQueue(table), 635 () -> procedure + " released the shared lock"); 636 waitingCount += wakeWaitingProcedures(tableLock); 637 } 638 if (namespaceLock.releaseSharedLock()) { 639 waitingCount += wakeWaitingProcedures(namespaceLock); 640 } 641 wakePollIfNeeded(waitingCount); 642 } finally { 643 schedUnlock(); 644 } 645 } 646 647 /** 648 * Tries to remove the queue and the table-lock of the specified table. If there are new 649 * operations pending (e.g. a new create), the remove will not be performed. 650 * @param table the name of the table that should be marked as deleted 651 * @param procedure the procedure that is removing the table 652 * @return true if deletion succeeded, false otherwise meaning that there are other new operations 653 * pending for that table (e.g. a new create). 654 */ 655 boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) { 656 schedLock(); 657 try { 658 final TableQueue queue = getTableQueue(table); 659 final LockAndQueue tableLock = locking.getTableLock(table); 660 if (queue == null) return true; 661 662 if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) { 663 // remove the table from the run-queue and the map 664 if (AvlIterableList.isLinked(queue)) { 665 tableRunQueue.remove(queue); 666 } 667 removeTableQueue(table); 668 } else { 669 // TODO: If there are no create, we can drop all the other ops 670 return false; 671 } 672 } finally { 673 schedUnlock(); 674 } 675 return true; 676 } 677 678 // ============================================================================ 679 // Region Locking Helpers 680 // ============================================================================ 681 /** 682 * Suspend the procedure if the specified region is already locked. 683 * @param procedure the procedure trying to acquire the lock on the region 684 * @param regionInfo the region we are trying to lock 685 * @return true if the procedure has to wait for the regions to be available 686 */ 687 public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 688 return waitRegions(procedure, regionInfo.getTable(), regionInfo); 689 } 690 691 /** 692 * Suspend the procedure if the specified set of regions are already locked. 693 * @param procedure the procedure trying to acquire the lock on the regions 694 * @param table the table name of the regions we are trying to lock 695 * @param regionInfos the list of regions we are trying to lock 696 * @return true if the procedure has to wait for the regions to be available 697 */ 698 public boolean waitRegions(final Procedure<?> procedure, final TableName table, 699 final RegionInfo... regionInfos) { 700 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 701 schedLock(); 702 try { 703 assert table != null; 704 if (waitTableSharedLock(procedure, table)) { 705 return true; 706 } 707 708 // acquire region xlocks or wait 709 boolean hasLock = true; 710 final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length]; 711 for (int i = 0; i < regionInfos.length; ++i) { 712 assert regionInfos[i] != null; 713 assert regionInfos[i].getTable() != null; 714 assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure; 715 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 716 : "duplicate region: " + regionInfos[i]; 717 718 regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName()); 719 if (!regionLocks[i].tryExclusiveLock(procedure)) { 720 LOG.info("Waiting on xlock for {} held by pid={}", procedure, 721 regionLocks[i].getExclusiveLockProcIdOwner()); 722 waitProcedure(regionLocks[i], procedure); 723 hasLock = false; 724 while (i-- > 0) { 725 regionLocks[i].releaseExclusiveLock(procedure); 726 } 727 break; 728 } else { 729 LOG.info("Took xlock for {}", procedure); 730 } 731 } 732 733 if (!hasLock) { 734 wakeTableSharedLock(procedure, table); 735 } 736 return !hasLock; 737 } finally { 738 schedUnlock(); 739 } 740 } 741 742 /** 743 * Wake the procedures waiting for the specified region 744 * @param procedure the procedure that was holding the region 745 * @param regionInfo the region the procedure was holding 746 */ 747 public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) { 748 wakeRegions(procedure, regionInfo.getTable(), regionInfo); 749 } 750 751 /** 752 * Wake the procedures waiting for the specified regions 753 * @param procedure the procedure that was holding the regions 754 * @param regionInfos the list of regions the procedure was holding 755 */ 756 public void wakeRegions(final Procedure<?> procedure, final TableName table, 757 final RegionInfo... regionInfos) { 758 Arrays.sort(regionInfos, RegionInfo.COMPARATOR); 759 schedLock(); 760 try { 761 int numProcs = 0; 762 final Procedure<?>[] nextProcs = new Procedure[regionInfos.length]; 763 for (int i = 0; i < regionInfos.length; ++i) { 764 assert regionInfos[i].getTable().equals(table); 765 assert i == 0 || regionInfos[i] != regionInfos[i - 1] 766 : "duplicate region: " + regionInfos[i]; 767 768 LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName()); 769 if (regionLock.releaseExclusiveLock(procedure)) { 770 if (!regionLock.isWaitingQueueEmpty()) { 771 // release one procedure at the time since regions has an xlock 772 nextProcs[numProcs++] = regionLock.removeFirst(); 773 } else { 774 locking.removeRegionLock(regionInfos[i].getEncodedName()); 775 } 776 } 777 } 778 779 // awake procedures if any 780 for (int i = numProcs - 1; i >= 0; --i) { 781 wakeProcedure(nextProcs[i]); 782 } 783 wakePollIfNeeded(numProcs); 784 // release the table shared-lock. 785 wakeTableSharedLock(procedure, table); 786 } finally { 787 schedUnlock(); 788 } 789 } 790 791 // ============================================================================ 792 // Namespace Locking Helpers 793 // ============================================================================ 794 /** 795 * Suspend the procedure if the specified namespace is already locked. 796 * @see #wakeNamespaceExclusiveLock(Procedure,String) 797 * @param procedure the procedure trying to acquire the lock 798 * @param namespace Namespace to lock 799 * @return true if the procedure has to wait for the namespace to be available 800 */ 801 public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) { 802 schedLock(); 803 try { 804 final LockAndQueue systemNamespaceTableLock = 805 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 806 if (!systemNamespaceTableLock.trySharedLock(procedure)) { 807 waitProcedure(systemNamespaceTableLock, procedure); 808 logLockedResource(LockedResourceType.TABLE, 809 TableName.NAMESPACE_TABLE_NAME.getNameAsString()); 810 return true; 811 } 812 813 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 814 if (!namespaceLock.tryExclusiveLock(procedure)) { 815 systemNamespaceTableLock.releaseSharedLock(); 816 waitProcedure(namespaceLock, procedure); 817 logLockedResource(LockedResourceType.NAMESPACE, namespace); 818 return true; 819 } 820 return false; 821 } finally { 822 schedUnlock(); 823 } 824 } 825 826 /** 827 * Wake the procedures waiting for the specified namespace 828 * @see #waitNamespaceExclusiveLock(Procedure,String) 829 * @param procedure the procedure releasing the lock 830 * @param namespace the namespace that has the exclusive lock 831 */ 832 public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) { 833 schedLock(); 834 try { 835 final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); 836 final LockAndQueue systemNamespaceTableLock = 837 locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); 838 int waitingCount = 0; 839 if (namespaceLock.releaseExclusiveLock(procedure)) { 840 waitingCount += wakeWaitingProcedures(namespaceLock); 841 } 842 if (systemNamespaceTableLock.releaseSharedLock()) { 843 addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME), 844 () -> procedure + " released namespace exclusive lock"); 845 waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); 846 } 847 wakePollIfNeeded(waitingCount); 848 } finally { 849 schedUnlock(); 850 } 851 } 852 853 // ============================================================================ 854 // Server Locking Helpers 855 // ============================================================================ 856 /** 857 * Try to acquire the exclusive lock on the specified server. 858 * @see #wakeServerExclusiveLock(Procedure,ServerName) 859 * @param procedure the procedure trying to acquire the lock 860 * @param serverName Server to lock 861 * @return true if the procedure has to wait for the server to be available 862 */ 863 public boolean waitServerExclusiveLock(final Procedure<?> procedure, 864 final ServerName serverName) { 865 schedLock(); 866 try { 867 final LockAndQueue lock = locking.getServerLock(serverName); 868 if (lock.tryExclusiveLock(procedure)) { 869 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 870 // so. 871 removeFromRunQueue(serverRunQueue, 872 getServerQueue(serverName, 873 procedure instanceof ServerProcedureInterface 874 ? (ServerProcedureInterface) procedure 875 : null), 876 () -> procedure + " held exclusive lock"); 877 return false; 878 } 879 waitProcedure(lock, procedure); 880 logLockedResource(LockedResourceType.SERVER, serverName.getServerName()); 881 return true; 882 } finally { 883 schedUnlock(); 884 } 885 } 886 887 /** 888 * Wake the procedures waiting for the specified server 889 * @see #waitServerExclusiveLock(Procedure,ServerName) 890 * @param procedure the procedure releasing the lock 891 * @param serverName the server that has the exclusive lock 892 */ 893 public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) { 894 schedLock(); 895 try { 896 final LockAndQueue lock = locking.getServerLock(serverName); 897 // Only SCP will acquire/release server lock so do not need to check the return value here. 898 lock.releaseExclusiveLock(procedure); 899 // In tests we may pass procedures other than ServerProcedureInterface, just pass null if 900 // so. 901 addToRunQueue(serverRunQueue, 902 getServerQueue(serverName, 903 procedure instanceof ServerProcedureInterface 904 ? (ServerProcedureInterface) procedure 905 : null), 906 () -> procedure + " released exclusive lock"); 907 int waitingCount = wakeWaitingProcedures(lock); 908 wakePollIfNeeded(waitingCount); 909 } finally { 910 schedUnlock(); 911 } 912 } 913 914 // ============================================================================ 915 // Peer Locking Helpers 916 // ============================================================================ 917 private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { 918 return proc.getPeerOperationType() != PeerOperationType.REFRESH; 919 } 920 921 /** 922 * Try to acquire the exclusive lock on the specified peer. 923 * @see #wakePeerExclusiveLock(Procedure, String) 924 * @param procedure the procedure trying to acquire the lock 925 * @param peerId peer to lock 926 * @return true if the procedure has to wait for the peer to be available 927 */ 928 public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) { 929 schedLock(); 930 try { 931 final LockAndQueue lock = locking.getPeerLock(peerId); 932 if (lock.tryExclusiveLock(procedure)) { 933 removeFromRunQueue(peerRunQueue, getPeerQueue(peerId), 934 () -> procedure + " held exclusive lock"); 935 return false; 936 } 937 waitProcedure(lock, procedure); 938 logLockedResource(LockedResourceType.PEER, peerId); 939 return true; 940 } finally { 941 schedUnlock(); 942 } 943 } 944 945 /** 946 * Wake the procedures waiting for the specified peer 947 * @see #waitPeerExclusiveLock(Procedure, String) 948 * @param procedure the procedure releasing the lock 949 * @param peerId the peer that has the exclusive lock 950 */ 951 public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) { 952 schedLock(); 953 try { 954 final LockAndQueue lock = locking.getPeerLock(peerId); 955 if (lock.releaseExclusiveLock(procedure)) { 956 addToRunQueue(peerRunQueue, getPeerQueue(peerId), 957 () -> procedure + " released exclusive lock"); 958 int waitingCount = wakeWaitingProcedures(lock); 959 wakePollIfNeeded(waitingCount); 960 } 961 } finally { 962 schedUnlock(); 963 } 964 } 965 966 // ============================================================================ 967 // Meta Locking Helpers 968 // ============================================================================ 969 /** 970 * Try to acquire the exclusive lock on meta. 971 * @see #wakeMetaExclusiveLock(Procedure) 972 * @param procedure the procedure trying to acquire the lock 973 * @return true if the procedure has to wait for meta to be available 974 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 975 * {@link RecoverMetaProcedure}. 976 */ 977 @Deprecated 978 public boolean waitMetaExclusiveLock(Procedure<?> procedure) { 979 schedLock(); 980 try { 981 final LockAndQueue lock = locking.getMetaLock(); 982 if (lock.tryExclusiveLock(procedure)) { 983 removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock"); 984 return false; 985 } 986 waitProcedure(lock, procedure); 987 logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString()); 988 return true; 989 } finally { 990 schedUnlock(); 991 } 992 } 993 994 /** 995 * Wake the procedures waiting for meta. 996 * @see #waitMetaExclusiveLock(Procedure) 997 * @param procedure the procedure releasing the lock 998 * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with 999 * {@link RecoverMetaProcedure}. 1000 */ 1001 @Deprecated 1002 public void wakeMetaExclusiveLock(Procedure<?> procedure) { 1003 schedLock(); 1004 try { 1005 final LockAndQueue lock = locking.getMetaLock(); 1006 lock.releaseExclusiveLock(procedure); 1007 addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock"); 1008 int waitingCount = wakeWaitingProcedures(lock); 1009 wakePollIfNeeded(waitingCount); 1010 } finally { 1011 schedUnlock(); 1012 } 1013 } 1014 1015 /** 1016 * For debugging. Expensive. 1017 */ 1018 public String dumpLocks() throws IOException { 1019 schedLock(); 1020 try { 1021 // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter 1022 return this.locking.toString(); 1023 } finally { 1024 schedUnlock(); 1025 } 1026 } 1027 1028 private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) { 1029 int size = queueSize(queue); 1030 if (size != 0) { 1031 builder.append(queueName, queue); 1032 } 1033 } 1034 1035 @Override 1036 public String toString() { 1037 ToStringBuilder builder = 1038 new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString()); 1039 schedLock(); 1040 try { 1041 for (int i = 0; i < serverBuckets.length; i++) { 1042 serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]); 1043 } 1044 builder.append("tableMap", tableMap); 1045 builder.append("peerMap", peerMap); 1046 builder.append("metaMap", metaMap); 1047 } finally { 1048 schedUnlock(); 1049 } 1050 return builder.build(); 1051 } 1052}