001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.function.Function;
024import java.util.function.Supplier;
025import org.apache.commons.lang3.builder.ToStringBuilder;
026import org.apache.commons.lang3.builder.ToStringStyle;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.TableExistsException;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.TableNotFoundException;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
033import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
034import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
035import org.apache.hadoop.hbase.procedure2.LockAndQueue;
036import org.apache.hadoop.hbase.procedure2.LockStatus;
037import org.apache.hadoop.hbase.procedure2.LockedResource;
038import org.apache.hadoop.hbase.procedure2.LockedResourceType;
039import org.apache.hadoop.hbase.procedure2.Procedure;
040import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
041import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
042import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
043import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the
050 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the
051 * master operations can be executed concurrently, if they are operating on different tables (e.g.
052 * two create table procedures can be performed at the same time) or against two different servers;
053 * say two servers that crashed at about the same time.
054 * <p>
055 * Each procedure should implement an Interface providing information for this queue. For example
056 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed
057 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can
058 * abort all the operations preceding a delete table, or similar.
059 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue,
060 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly
061 * includes:<br>
062 * <ul>
063 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when:
064 * <ol>
065 * <li>Queue was empty before push (so must have been out of run-queue)</li>
066 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have
067 * moved Queue out of run-queue)</li>
068 * </ol>
069 * </li>
070 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when:
071 * <ol>
072 * <li>Queue becomes empty after poll</li>
073 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the
074 * procedure)</li>
075 * <li>Exclusive lock is requested but lock is not available (returns null)</li>
076 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a
077 * child</li>
078 * </ol>
079 * </li>
080 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
081 * <ol>
082 * <li>Exclusive lock</li>
083 * <li>Last shared lock (in case queue was removed because next procedure in queue required
084 * exclusive lock)</li>
085 * </ol>
086 * </li>
087 * </ul>
088 */
089@InterfaceAudience.Private
090public class MasterProcedureScheduler extends AbstractProcedureScheduler {
091  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
092
093  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
094    (n, k) -> n.compareKey((ServerName) k);
095  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
096    (n, k) -> n.compareKey((TableName) k);
097  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
098    (n, k) -> n.compareKey((String) k);
099  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
100    (n, k) -> n.compareKey((TableName) k);
101
102  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
103  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
104  private final FairQueue<String> peerRunQueue = new FairQueue<>();
105  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
106
107  private final ServerQueue[] serverBuckets = new ServerQueue[128];
108  private TableQueue tableMap = null;
109  private PeerQueue peerMap = null;
110  private MetaQueue metaMap = null;
111
112  private final SchemaLocking locking;
113
114  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
115    locking = new SchemaLocking(procedureRetriever);
116  }
117
118  @Override
119  public void yield(final Procedure proc) {
120    push(proc, false, true);
121  }
122
123  @Override
124  protected void enqueue(final Procedure proc, final boolean addFront) {
125    if (isMetaProcedure(proc)) {
126      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
127    } else if (isTableProcedure(proc)) {
128      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
129    } else if (isServerProcedure(proc)) {
130      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
131      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
132    } else if (isPeerProcedure(proc)) {
133      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
134    } else {
135      // TODO: at the moment we only have Table and Server procedures
136      // if you are implementing a non-table/non-server procedure, you have two options: create
137      // a group for all the non-table/non-server procedures or try to find a key for your
138      // non-table/non-server procedures and implement something similar to the TableRunQueue.
139      throw new UnsupportedOperationException(
140        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
141    }
142  }
143
144  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
145    Procedure<?> proc, boolean addFront) {
146    queue.add(proc, addFront);
147    // For the following conditions, we will put the queue back into execution
148    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
149    // which means it can be executed immediately.
150    // 2. The exclusive lock for this queue has not been held.
151    // 3. The given procedure has the exclusive lock permission for this queue.
152    Supplier<String> reason = null;
153    if (proc.hasLock()) {
154      reason = () -> proc + " has lock";
155    } else if (proc.isLockedWhenLoading()) {
156      reason = () -> proc + " restores lock when restarting";
157    } else if (!queue.getLockStatus().hasExclusiveLock()) {
158      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
159    } else if (queue.getLockStatus().hasLockAccess(proc)) {
160      reason = () -> proc + " has the excusive lock access";
161    }
162    if (reason != null) {
163      addToRunQueue(fairq, queue, reason);
164    }
165  }
166
167  @Override
168  protected boolean queueHasRunnables() {
169    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
170      || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
171  }
172
173  @Override
174  protected Procedure dequeue() {
175    // meta procedure is always the first priority
176    Procedure<?> pollResult = doPoll(metaRunQueue);
177    // For now, let server handling have precedence over table handling; presumption is that it
178    // is more important handling crashed servers than it is running the
179    // enabling/disabling tables, etc.
180    if (pollResult == null) {
181      pollResult = doPoll(serverRunQueue);
182    }
183    if (pollResult == null) {
184      pollResult = doPoll(peerRunQueue);
185    }
186    if (pollResult == null) {
187      pollResult = doPoll(tableRunQueue);
188    }
189    return pollResult;
190  }
191
192  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
193    LockStatus s = rq.getLockStatus();
194    // if we have the lock access, we are ready
195    if (s.hasLockAccess(proc)) {
196      return true;
197    }
198    boolean xlockReq = rq.requireExclusiveLock(proc);
199    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
200    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
201    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
202  }
203
204  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
205    Queue<T> rq = fairq.poll();
206    if (rq == null || !rq.isAvailable()) {
207      return null;
208    }
209    // loop until we find out a procedure which is ready to run, or if we have checked all the
210    // procedures, then we give up and remove the queue from run queue.
211    for (int i = 0, n = rq.size(); i < n; i++) {
212      Procedure<?> proc = rq.poll();
213      if (isLockReady(proc, rq)) {
214        // the queue is empty, remove from run queue
215        if (rq.isEmpty()) {
216          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
217        }
218        return proc;
219      }
220      // we are not ready to run, add back and try the next procedure
221      rq.add(proc, false);
222    }
223    // no procedure is ready for execution, remove from run queue
224    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
225    return null;
226  }
227
228  @Override
229  public List<LockedResource> getLocks() {
230    schedLock();
231    try {
232      return locking.getLocks();
233    } finally {
234      schedUnlock();
235    }
236  }
237
238  @Override
239  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
240    schedLock();
241    try {
242      return locking.getLockResource(resourceType, resourceName);
243    } finally {
244      schedUnlock();
245    }
246  }
247
248  @Override
249  public void clear() {
250    schedLock();
251    try {
252      clearQueue();
253      locking.clear();
254    } finally {
255      schedUnlock();
256    }
257  }
258
259  private void clearQueue() {
260    // Remove Servers
261    for (int i = 0; i < serverBuckets.length; ++i) {
262      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
263      serverBuckets[i] = null;
264    }
265
266    // Remove Tables
267    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
268    tableMap = null;
269
270    // Remove Peers
271    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
272    peerMap = null;
273
274    assert size() == 0 : "expected queue size to be 0, got " + size();
275  }
276
277  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
278    FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
279    while (treeMap != null) {
280      Queue<T> node = AvlTree.getFirst(treeMap);
281      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
282      if (fairq != null) {
283        removeFromRunQueue(fairq, node, () -> "clear all queues");
284      }
285    }
286  }
287
288  private int queueSize(Queue<?> head) {
289    int count = 0;
290    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
291    while (iter.hasNext()) {
292      count += iter.next().size();
293    }
294    return count;
295  }
296
297  @Override
298  protected int queueSize() {
299    int count = 0;
300    for (ServerQueue serverMap : serverBuckets) {
301      count += queueSize(serverMap);
302    }
303    count += queueSize(tableMap);
304    count += queueSize(peerMap);
305    count += queueSize(metaMap);
306    return count;
307  }
308
309  @Override
310  public void completionCleanup(final Procedure proc) {
311    if (proc instanceof TableProcedureInterface) {
312      TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
313      boolean tableDeleted;
314      if (proc.hasException()) {
315        Exception procEx = proc.getException().unwrapRemoteException();
316        if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
317          // create failed because the table already exist
318          tableDeleted = !(procEx instanceof TableExistsException);
319        } else {
320          // the operation failed because the table does not exist
321          tableDeleted = (procEx instanceof TableNotFoundException);
322        }
323      } else {
324        // the table was deleted
325        tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
326      }
327      if (tableDeleted) {
328        markTableAsDeleted(iProcTable.getTableName(), proc);
329        return;
330      }
331    } else if (proc instanceof PeerProcedureInterface) {
332      tryCleanupPeerQueue(getPeerId(proc), proc);
333    } else if (proc instanceof ServerProcedureInterface) {
334      tryCleanupServerQueue(getServerName(proc), proc);
335    } else {
336      // No cleanup for other procedure types, yet.
337      return;
338    }
339  }
340
341  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
342    Supplier<String> reason) {
343    if (LOG.isTraceEnabled()) {
344      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
345    }
346    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
347      fairq.add(queue);
348    }
349  }
350
351  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
352    Queue<T> queue, Supplier<String> reason) {
353    if (LOG.isTraceEnabled()) {
354      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
355    }
356    if (AvlIterableList.isLinked(queue)) {
357      fairq.remove(queue);
358    }
359  }
360
361  // ============================================================================
362  // Table Queue Lookup Helpers
363  // ============================================================================
364  private TableQueue getTableQueue(TableName tableName) {
365    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
366    if (node != null) return node;
367
368    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
369      locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
370    tableMap = AvlTree.insert(tableMap, node);
371    return node;
372  }
373
374  private void removeTableQueue(TableName tableName) {
375    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
376    locking.removeTableLock(tableName);
377  }
378
379  private static boolean isTableProcedure(Procedure<?> proc) {
380    return proc instanceof TableProcedureInterface;
381  }
382
383  private static TableName getTableName(Procedure<?> proc) {
384    return ((TableProcedureInterface) proc).getTableName();
385  }
386
387  // ============================================================================
388  // Server Queue Lookup Helpers
389  // ============================================================================
390  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
391    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
392    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
393    if (node != null) {
394      return node;
395    }
396    int priority;
397    if (proc != null) {
398      priority = MasterProcedureUtil.getServerPriority(proc);
399    } else {
400      priority = 1;
401    }
402    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
403    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
404    return node;
405  }
406
407  private void removeServerQueue(ServerName serverName) {
408    int index = getBucketIndex(serverBuckets, serverName.hashCode());
409    serverBuckets[index] =
410      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
411    locking.removeServerLock(serverName);
412  }
413
414  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
415    schedLock();
416    try {
417      int index = getBucketIndex(serverBuckets, serverName.hashCode());
418      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
419      if (node == null) {
420        return;
421      }
422
423      LockAndQueue lock = locking.getServerLock(serverName);
424      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
425        removeFromRunQueue(serverRunQueue, node,
426          () -> "clean up server queue after " + proc + " completed");
427        removeServerQueue(serverName);
428      }
429    } finally {
430      schedUnlock();
431    }
432  }
433
434  private static int getBucketIndex(Object[] buckets, int hashCode) {
435    return Math.abs(hashCode) % buckets.length;
436  }
437
438  private static boolean isServerProcedure(Procedure<?> proc) {
439    return proc instanceof ServerProcedureInterface;
440  }
441
442  private static ServerName getServerName(Procedure<?> proc) {
443    return ((ServerProcedureInterface) proc).getServerName();
444  }
445
446  // ============================================================================
447  // Peer Queue Lookup Helpers
448  // ============================================================================
449  private PeerQueue getPeerQueue(String peerId) {
450    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
451    if (node != null) {
452      return node;
453    }
454    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
455    peerMap = AvlTree.insert(peerMap, node);
456    return node;
457  }
458
459  private void removePeerQueue(String peerId) {
460    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
461    locking.removePeerLock(peerId);
462  }
463
464  private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
465    schedLock();
466    try {
467      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
468      if (queue == null) {
469        return;
470      }
471
472      final LockAndQueue lock = locking.getPeerLock(peerId);
473      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
474        removeFromRunQueue(peerRunQueue, queue,
475          () -> "clean up peer queue after " + procedure + " completed");
476        removePeerQueue(peerId);
477      }
478    } finally {
479      schedUnlock();
480    }
481  }
482
483  private static boolean isPeerProcedure(Procedure<?> proc) {
484    return proc instanceof PeerProcedureInterface;
485  }
486
487  private static String getPeerId(Procedure<?> proc) {
488    return ((PeerProcedureInterface) proc).getPeerId();
489  }
490
491  // ============================================================================
492  // Meta Queue Lookup Helpers
493  // ============================================================================
494  private MetaQueue getMetaQueue() {
495    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
496    if (node != null) {
497      return node;
498    }
499    node = new MetaQueue(locking.getMetaLock());
500    metaMap = AvlTree.insert(metaMap, node);
501    return node;
502  }
503
504  private static boolean isMetaProcedure(Procedure<?> proc) {
505    return proc instanceof MetaProcedureInterface;
506  }
507
508  // ============================================================================
509  // Table Locking Helpers
510  // ============================================================================
511  /**
512   * Get lock info for a resource of specified type and name and log details
513   */
514  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
515    if (!LOG.isDebugEnabled()) {
516      return;
517    }
518
519    LockedResource lockedResource = getLockResource(resourceType, resourceName);
520    if (lockedResource != null) {
521      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count="
522        + lockedResource.getSharedLockCount();
523
524      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
525      if (proc != null) {
526        msg += ", exclusively locked by procId=" + proc.getProcId();
527      }
528      LOG.debug(msg);
529    }
530  }
531
532  /**
533   * Suspend the procedure if the specified table is already locked. Other operations in the
534   * table-queue will be executed after the lock is released.
535   * @param procedure the procedure trying to acquire the lock
536   * @param table     Table to lock
537   * @return true if the procedure has to wait for the table to be available
538   */
539  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
540    schedLock();
541    try {
542      final String namespace = table.getNamespaceAsString();
543      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
544      final LockAndQueue tableLock = locking.getTableLock(table);
545      if (!namespaceLock.trySharedLock(procedure)) {
546        waitProcedure(namespaceLock, procedure);
547        logLockedResource(LockedResourceType.NAMESPACE, namespace);
548        return true;
549      }
550      if (!tableLock.tryExclusiveLock(procedure)) {
551        namespaceLock.releaseSharedLock();
552        waitProcedure(tableLock, procedure);
553        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
554        return true;
555      }
556      removeFromRunQueue(tableRunQueue, getTableQueue(table),
557        () -> procedure + " held the exclusive lock");
558      return false;
559    } finally {
560      schedUnlock();
561    }
562  }
563
564  /**
565   * Wake the procedures waiting for the specified table
566   * @param procedure the procedure releasing the lock
567   * @param table     the name of the table that has the exclusive lock
568   */
569  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
570    schedLock();
571    try {
572      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
573      final LockAndQueue tableLock = locking.getTableLock(table);
574      int waitingCount = 0;
575      if (tableLock.releaseExclusiveLock(procedure)) {
576        waitingCount += wakeWaitingProcedures(tableLock);
577      }
578      if (namespaceLock.releaseSharedLock()) {
579        waitingCount += wakeWaitingProcedures(namespaceLock);
580      }
581      addToRunQueue(tableRunQueue, getTableQueue(table),
582        () -> procedure + " released the exclusive lock");
583      wakePollIfNeeded(waitingCount);
584    } finally {
585      schedUnlock();
586    }
587  }
588
589  /**
590   * Suspend the procedure if the specified table is already locked. other "read" operations in the
591   * table-queue may be executed concurrently,
592   * @param procedure the procedure trying to acquire the lock
593   * @param table     Table to lock
594   * @return true if the procedure has to wait for the table to be available
595   */
596  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
597    return waitTableQueueSharedLock(procedure, table) == null;
598  }
599
600  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
601    schedLock();
602    try {
603      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
604      final LockAndQueue tableLock = locking.getTableLock(table);
605      if (!namespaceLock.trySharedLock(procedure)) {
606        waitProcedure(namespaceLock, procedure);
607        return null;
608      }
609
610      if (!tableLock.trySharedLock(procedure)) {
611        namespaceLock.releaseSharedLock();
612        waitProcedure(tableLock, procedure);
613        return null;
614      }
615
616      return getTableQueue(table);
617    } finally {
618      schedUnlock();
619    }
620  }
621
622  /**
623   * Wake the procedures waiting for the specified table
624   * @param procedure the procedure releasing the lock
625   * @param table     the name of the table that has the shared lock
626   */
627  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
628    schedLock();
629    try {
630      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
631      final LockAndQueue tableLock = locking.getTableLock(table);
632      int waitingCount = 0;
633      if (tableLock.releaseSharedLock()) {
634        addToRunQueue(tableRunQueue, getTableQueue(table),
635          () -> procedure + " released the shared lock");
636        waitingCount += wakeWaitingProcedures(tableLock);
637      }
638      if (namespaceLock.releaseSharedLock()) {
639        waitingCount += wakeWaitingProcedures(namespaceLock);
640      }
641      wakePollIfNeeded(waitingCount);
642    } finally {
643      schedUnlock();
644    }
645  }
646
647  /**
648   * Tries to remove the queue and the table-lock of the specified table. If there are new
649   * operations pending (e.g. a new create), the remove will not be performed.
650   * @param table     the name of the table that should be marked as deleted
651   * @param procedure the procedure that is removing the table
652   * @return true if deletion succeeded, false otherwise meaning that there are other new operations
653   *         pending for that table (e.g. a new create).
654   */
655  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
656    schedLock();
657    try {
658      final TableQueue queue = getTableQueue(table);
659      final LockAndQueue tableLock = locking.getTableLock(table);
660      if (queue == null) return true;
661
662      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
663        // remove the table from the run-queue and the map
664        if (AvlIterableList.isLinked(queue)) {
665          tableRunQueue.remove(queue);
666        }
667        removeTableQueue(table);
668      } else {
669        // TODO: If there are no create, we can drop all the other ops
670        return false;
671      }
672    } finally {
673      schedUnlock();
674    }
675    return true;
676  }
677
678  // ============================================================================
679  // Region Locking Helpers
680  // ============================================================================
681  /**
682   * Suspend the procedure if the specified region is already locked.
683   * @param procedure  the procedure trying to acquire the lock on the region
684   * @param regionInfo the region we are trying to lock
685   * @return true if the procedure has to wait for the regions to be available
686   */
687  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
688    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
689  }
690
691  /**
692   * Suspend the procedure if the specified set of regions are already locked.
693   * @param procedure   the procedure trying to acquire the lock on the regions
694   * @param table       the table name of the regions we are trying to lock
695   * @param regionInfos the list of regions we are trying to lock
696   * @return true if the procedure has to wait for the regions to be available
697   */
698  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
699    final RegionInfo... regionInfos) {
700    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
701    schedLock();
702    try {
703      assert table != null;
704      if (waitTableSharedLock(procedure, table)) {
705        return true;
706      }
707
708      // acquire region xlocks or wait
709      boolean hasLock = true;
710      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
711      for (int i = 0; i < regionInfos.length; ++i) {
712        assert regionInfos[i] != null;
713        assert regionInfos[i].getTable() != null;
714        assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure;
715        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
716          : "duplicate region: " + regionInfos[i];
717
718        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
719        if (!regionLocks[i].tryExclusiveLock(procedure)) {
720          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
721            regionLocks[i].getExclusiveLockProcIdOwner());
722          waitProcedure(regionLocks[i], procedure);
723          hasLock = false;
724          while (i-- > 0) {
725            regionLocks[i].releaseExclusiveLock(procedure);
726          }
727          break;
728        } else {
729          LOG.info("Took xlock for {}", procedure);
730        }
731      }
732
733      if (!hasLock) {
734        wakeTableSharedLock(procedure, table);
735      }
736      return !hasLock;
737    } finally {
738      schedUnlock();
739    }
740  }
741
742  /**
743   * Wake the procedures waiting for the specified region
744   * @param procedure  the procedure that was holding the region
745   * @param regionInfo the region the procedure was holding
746   */
747  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
748    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
749  }
750
751  /**
752   * Wake the procedures waiting for the specified regions
753   * @param procedure   the procedure that was holding the regions
754   * @param regionInfos the list of regions the procedure was holding
755   */
756  public void wakeRegions(final Procedure<?> procedure, final TableName table,
757    final RegionInfo... regionInfos) {
758    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
759    schedLock();
760    try {
761      int numProcs = 0;
762      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
763      for (int i = 0; i < regionInfos.length; ++i) {
764        assert regionInfos[i].getTable().equals(table);
765        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
766          : "duplicate region: " + regionInfos[i];
767
768        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
769        if (regionLock.releaseExclusiveLock(procedure)) {
770          if (!regionLock.isWaitingQueueEmpty()) {
771            // release one procedure at the time since regions has an xlock
772            nextProcs[numProcs++] = regionLock.removeFirst();
773          } else {
774            locking.removeRegionLock(regionInfos[i].getEncodedName());
775          }
776        }
777      }
778
779      // awake procedures if any
780      for (int i = numProcs - 1; i >= 0; --i) {
781        wakeProcedure(nextProcs[i]);
782      }
783      wakePollIfNeeded(numProcs);
784      // release the table shared-lock.
785      wakeTableSharedLock(procedure, table);
786    } finally {
787      schedUnlock();
788    }
789  }
790
791  // ============================================================================
792  // Namespace Locking Helpers
793  // ============================================================================
794  /**
795   * Suspend the procedure if the specified namespace is already locked.
796   * @see #wakeNamespaceExclusiveLock(Procedure,String)
797   * @param procedure the procedure trying to acquire the lock
798   * @param namespace Namespace to lock
799   * @return true if the procedure has to wait for the namespace to be available
800   */
801  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
802    schedLock();
803    try {
804      final LockAndQueue systemNamespaceTableLock =
805        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
806      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
807        waitProcedure(systemNamespaceTableLock, procedure);
808        logLockedResource(LockedResourceType.TABLE,
809          TableName.NAMESPACE_TABLE_NAME.getNameAsString());
810        return true;
811      }
812
813      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
814      if (!namespaceLock.tryExclusiveLock(procedure)) {
815        systemNamespaceTableLock.releaseSharedLock();
816        waitProcedure(namespaceLock, procedure);
817        logLockedResource(LockedResourceType.NAMESPACE, namespace);
818        return true;
819      }
820      return false;
821    } finally {
822      schedUnlock();
823    }
824  }
825
826  /**
827   * Wake the procedures waiting for the specified namespace
828   * @see #waitNamespaceExclusiveLock(Procedure,String)
829   * @param procedure the procedure releasing the lock
830   * @param namespace the namespace that has the exclusive lock
831   */
832  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
833    schedLock();
834    try {
835      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
836      final LockAndQueue systemNamespaceTableLock =
837        locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
838      int waitingCount = 0;
839      if (namespaceLock.releaseExclusiveLock(procedure)) {
840        waitingCount += wakeWaitingProcedures(namespaceLock);
841      }
842      if (systemNamespaceTableLock.releaseSharedLock()) {
843        addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME),
844          () -> procedure + " released namespace exclusive lock");
845        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
846      }
847      wakePollIfNeeded(waitingCount);
848    } finally {
849      schedUnlock();
850    }
851  }
852
853  // ============================================================================
854  // Server Locking Helpers
855  // ============================================================================
856  /**
857   * Try to acquire the exclusive lock on the specified server.
858   * @see #wakeServerExclusiveLock(Procedure,ServerName)
859   * @param procedure  the procedure trying to acquire the lock
860   * @param serverName Server to lock
861   * @return true if the procedure has to wait for the server to be available
862   */
863  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
864    final ServerName serverName) {
865    schedLock();
866    try {
867      final LockAndQueue lock = locking.getServerLock(serverName);
868      if (lock.tryExclusiveLock(procedure)) {
869        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
870        // so.
871        removeFromRunQueue(serverRunQueue,
872          getServerQueue(serverName,
873            procedure instanceof ServerProcedureInterface
874              ? (ServerProcedureInterface) procedure
875              : null),
876          () -> procedure + " held exclusive lock");
877        return false;
878      }
879      waitProcedure(lock, procedure);
880      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
881      return true;
882    } finally {
883      schedUnlock();
884    }
885  }
886
887  /**
888   * Wake the procedures waiting for the specified server
889   * @see #waitServerExclusiveLock(Procedure,ServerName)
890   * @param procedure  the procedure releasing the lock
891   * @param serverName the server that has the exclusive lock
892   */
893  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
894    schedLock();
895    try {
896      final LockAndQueue lock = locking.getServerLock(serverName);
897      // Only SCP will acquire/release server lock so do not need to check the return value here.
898      lock.releaseExclusiveLock(procedure);
899      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
900      // so.
901      addToRunQueue(serverRunQueue,
902        getServerQueue(serverName,
903          procedure instanceof ServerProcedureInterface
904            ? (ServerProcedureInterface) procedure
905            : null),
906        () -> procedure + " released exclusive lock");
907      int waitingCount = wakeWaitingProcedures(lock);
908      wakePollIfNeeded(waitingCount);
909    } finally {
910      schedUnlock();
911    }
912  }
913
914  // ============================================================================
915  // Peer Locking Helpers
916  // ============================================================================
917  private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
918    return proc.getPeerOperationType() != PeerOperationType.REFRESH;
919  }
920
921  /**
922   * Try to acquire the exclusive lock on the specified peer.
923   * @see #wakePeerExclusiveLock(Procedure, String)
924   * @param procedure the procedure trying to acquire the lock
925   * @param peerId    peer to lock
926   * @return true if the procedure has to wait for the peer to be available
927   */
928  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
929    schedLock();
930    try {
931      final LockAndQueue lock = locking.getPeerLock(peerId);
932      if (lock.tryExclusiveLock(procedure)) {
933        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
934          () -> procedure + " held exclusive lock");
935        return false;
936      }
937      waitProcedure(lock, procedure);
938      logLockedResource(LockedResourceType.PEER, peerId);
939      return true;
940    } finally {
941      schedUnlock();
942    }
943  }
944
945  /**
946   * Wake the procedures waiting for the specified peer
947   * @see #waitPeerExclusiveLock(Procedure, String)
948   * @param procedure the procedure releasing the lock
949   * @param peerId    the peer that has the exclusive lock
950   */
951  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
952    schedLock();
953    try {
954      final LockAndQueue lock = locking.getPeerLock(peerId);
955      if (lock.releaseExclusiveLock(procedure)) {
956        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
957          () -> procedure + " released exclusive lock");
958        int waitingCount = wakeWaitingProcedures(lock);
959        wakePollIfNeeded(waitingCount);
960      }
961    } finally {
962      schedUnlock();
963    }
964  }
965
966  // ============================================================================
967  // Meta Locking Helpers
968  // ============================================================================
969  /**
970   * Try to acquire the exclusive lock on meta.
971   * @see #wakeMetaExclusiveLock(Procedure)
972   * @param procedure the procedure trying to acquire the lock
973   * @return true if the procedure has to wait for meta to be available
974   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
975   *             {@link RecoverMetaProcedure}.
976   */
977  @Deprecated
978  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
979    schedLock();
980    try {
981      final LockAndQueue lock = locking.getMetaLock();
982      if (lock.tryExclusiveLock(procedure)) {
983        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
984        return false;
985      }
986      waitProcedure(lock, procedure);
987      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
988      return true;
989    } finally {
990      schedUnlock();
991    }
992  }
993
994  /**
995   * Wake the procedures waiting for meta.
996   * @see #waitMetaExclusiveLock(Procedure)
997   * @param procedure the procedure releasing the lock
998   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
999   *             {@link RecoverMetaProcedure}.
1000   */
1001  @Deprecated
1002  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1003    schedLock();
1004    try {
1005      final LockAndQueue lock = locking.getMetaLock();
1006      lock.releaseExclusiveLock(procedure);
1007      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1008      int waitingCount = wakeWaitingProcedures(lock);
1009      wakePollIfNeeded(waitingCount);
1010    } finally {
1011      schedUnlock();
1012    }
1013  }
1014
1015  /**
1016   * For debugging. Expensive.
1017   */
1018  public String dumpLocks() throws IOException {
1019    schedLock();
1020    try {
1021      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1022      return this.locking.toString();
1023    } finally {
1024      schedUnlock();
1025    }
1026  }
1027
1028  private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) {
1029    int size = queueSize(queue);
1030    if (size != 0) {
1031      builder.append(queueName, queue);
1032    }
1033  }
1034
1035  @Override
1036  public String toString() {
1037    ToStringBuilder builder =
1038      new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString());
1039    schedLock();
1040    try {
1041      for (int i = 0; i < serverBuckets.length; i++) {
1042        serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]);
1043      }
1044      builder.append("tableMap", tableMap);
1045      builder.append("peerMap", peerMap);
1046      builder.append("metaMap", metaMap);
1047    } finally {
1048      schedUnlock();
1049    }
1050    return builder.build();
1051  }
1052}