001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.function.Function;
027import java.util.function.Supplier;
028import org.apache.commons.lang3.builder.ToStringBuilder;
029import org.apache.commons.lang3.builder.ToStringStyle;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.TableExistsException;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.TableNotFoundException;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
037import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
038import org.apache.hadoop.hbase.procedure2.LockAndQueue;
039import org.apache.hadoop.hbase.procedure2.LockStatus;
040import org.apache.hadoop.hbase.procedure2.LockedResource;
041import org.apache.hadoop.hbase.procedure2.LockedResourceType;
042import org.apache.hadoop.hbase.procedure2.Procedure;
043import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
044import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
045import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
046import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * ProcedureScheduler for the Master Procedures. This ProcedureScheduler tries to provide to the
053 * ProcedureExecutor procedures that can be executed without having to wait on a lock. Most of the
054 * master operations can be executed concurrently, if they are operating on different tables (e.g.
055 * two create table procedures can be performed at the same time) or against two different servers;
056 * say two servers that crashed at about the same time.
057 * <p>
058 * Each procedure should implement an Interface providing information for this queue. For example
059 * table related procedures should implement TableProcedureInterface. Each procedure will be pushed
060 * in its own queue, and based on the operation type we may make smarter decisions: e.g. we can
061 * abort all the operations preceding a delete table, or similar.
062 * <h4>Concurrency control</h4> Concurrent access to member variables (tableRunQueue,
063 * serverRunQueue, locking, tableMap, serverBuckets) is controlled by schedLock(). This mainly
064 * includes:<br>
065 * <ul>
066 * <li>{@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue when:
067 * <ol>
068 * <li>Queue was empty before push (so must have been out of run-queue)</li>
069 * <li>Child procedure is added (which means parent procedure holds exclusive lock, and it must have
070 * moved Queue out of run-queue)</li>
071 * </ol>
072 * </li>
073 * <li>{@link #poll(long)}: A poll will remove a Queue from run-queue when:
074 * <ol>
075 * <li>Queue becomes empty after poll</li>
076 * <li>Exclusive lock is requested by polled procedure and lock is available (returns the
077 * procedure)</li>
078 * <li>Exclusive lock is requested but lock is not available (returns null)</li>
079 * <li>Polled procedure is child of parent holding exclusive lock and the next procedure is not a
080 * child</li>
081 * </ol>
082 * </li>
083 * <li>Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
084 * <ol>
085 * <li>Exclusive lock</li>
086 * <li>Last shared lock (in case queue was removed because next procedure in queue required
087 * exclusive lock)</li>
088 * </ol>
089 * </li>
090 * </ul>
091 */
092@InterfaceAudience.Private
093public class MasterProcedureScheduler extends AbstractProcedureScheduler {
094  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
095
096  private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
097    (n, k) -> n.compareKey((ServerName) k);
098  private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
099    (n, k) -> n.compareKey((TableName) k);
100  private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
101    (n, k) -> n.compareKey((String) k);
102  private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
103    (n, k) -> n.compareKey((TableName) k);
104  private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
105    (n, k) -> n.compareKey((String) k);
106
107  private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
108  private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
109  private final FairQueue<String> peerRunQueue = new FairQueue<>();
110  private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
111  private final FairQueue<String> globalRunQueue = new FairQueue<>();
112
113  private final ServerQueue[] serverBuckets = new ServerQueue[128];
114  private TableQueue tableMap = null;
115  private PeerQueue peerMap = null;
116  private MetaQueue metaMap = null;
117  private GlobalQueue globalMap = null;
118
119  private final Function<Long, Procedure<?>> procedureRetriever;
120  private final SchemaLocking locking;
121
122  // To prevent multiple Create/Modify/Disable/Enable table procedure run at the same time, we will
123  // keep table procedure in this queue first before actually enqueuing it to tableQueue
124  // Seee HBASE-28683 for more details
125  private final Map<TableName, TableProcedureWaitingQueue> tableProcsWaitingEnqueue =
126    new HashMap<>();
127
128  public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
129    this.procedureRetriever = procedureRetriever;
130    locking = new SchemaLocking(procedureRetriever);
131  }
132
133  @Override
134  public void yield(final Procedure proc) {
135    push(proc, false, true);
136  }
137
138  private boolean shouldWaitBeforeEnqueuing(TableProcedureInterface proc) {
139    return TableQueue.requireTableExclusiveLock(proc);
140  }
141
142  @Override
143  protected void enqueue(final Procedure proc, final boolean addFront) {
144    if (isMetaProcedure(proc)) {
145      doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
146    } else if (isTableProcedure(proc)) {
147      TableProcedureInterface tableProc = (TableProcedureInterface) proc;
148      if (shouldWaitBeforeEnqueuing(tableProc)) {
149        TableProcedureWaitingQueue waitingQueue = tableProcsWaitingEnqueue.computeIfAbsent(
150          tableProc.getTableName(), k -> new TableProcedureWaitingQueue(procedureRetriever));
151        if (!waitingQueue.procedureSubmitted(proc)) {
152          // there is a table procedure for this table already enqueued, waiting
153          LOG.debug("There is already a procedure running for table {}, added {} to waiting queue",
154            tableProc.getTableName(), proc);
155          return;
156        }
157      }
158      doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
159    } else if (isServerProcedure(proc)) {
160      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
161      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
162    } else if (isPeerProcedure(proc)) {
163      doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
164    } else if (isGlobalProcedure(proc)) {
165      doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
166    } else {
167      // TODO: at the moment we only have Table and Server procedures
168      // if you are implementing a non-table/non-server procedure, you have two options: create
169      // a group for all the non-table/non-server procedures or try to find a key for your
170      // non-table/non-server procedures and implement something similar to the TableRunQueue.
171      throw new UnsupportedOperationException(
172        "RQs for non-table/non-server procedures are not implemented yet: " + proc);
173    }
174  }
175
176  private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
177    Procedure<?> proc, boolean addFront) {
178    queue.add(proc, addFront);
179    // For the following conditions, we will put the queue back into execution
180    // 1. The procedure has already held the lock, or the lock has been restored when restarting,
181    // which means it can be executed immediately.
182    // 2. The exclusive lock for this queue has not been held.
183    // 3. The given procedure has the exclusive lock permission for this queue.
184    Supplier<String> reason = null;
185    if (proc.hasLock()) {
186      reason = () -> proc + " has lock";
187    } else if (proc.isLockedWhenLoading()) {
188      reason = () -> proc + " restores lock when restarting";
189    } else if (!queue.getLockStatus().hasExclusiveLock()) {
190      reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
191    } else if (queue.getLockStatus().hasLockAccess(proc)) {
192      reason = () -> proc + " has the excusive lock access";
193    }
194    if (reason != null) {
195      addToRunQueue(fairq, queue, reason);
196    }
197  }
198
199  @Override
200  protected boolean queueHasRunnables() {
201    return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
202      || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
203      || peerRunQueue.hasRunnables();
204  }
205
206  @Override
207  protected Procedure dequeue() {
208    // pull global first
209    Procedure<?> pollResult = doPoll(globalRunQueue);
210    // then meta procedure
211    if (pollResult == null) {
212      pollResult = doPoll(metaRunQueue);
213    }
214    // For now, let server handling have precedence over table handling; presumption is that it
215    // is more important handling crashed servers than it is running the
216    // enabling/disabling tables, etc.
217    if (pollResult == null) {
218      pollResult = doPoll(serverRunQueue);
219    }
220    if (pollResult == null) {
221      pollResult = doPoll(peerRunQueue);
222    }
223    if (pollResult == null) {
224      pollResult = doPoll(tableRunQueue);
225    }
226    return pollResult;
227  }
228
229  private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
230    LockStatus s = rq.getLockStatus();
231    // if we have the lock access, we are ready
232    if (s.hasLockAccess(proc)) {
233      return true;
234    }
235    boolean xlockReq = rq.requireExclusiveLock(proc);
236    // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
237    // the shared lock, otherwise, we just need to make sure that no one holds the xlock
238    return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
239  }
240
241  private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
242    Queue<T> rq = fairq.poll();
243    if (rq == null || !rq.isAvailable()) {
244      return null;
245    }
246    // loop until we find out a procedure which is ready to run, or if we have checked all the
247    // procedures, then we give up and remove the queue from run queue.
248    for (int i = 0, n = rq.size(); i < n; i++) {
249      Procedure<?> proc = rq.poll();
250      if (isLockReady(proc, rq)) {
251        // the queue is empty, remove from run queue
252        if (rq.isEmpty()) {
253          removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
254        }
255        return proc;
256      }
257      // we are not ready to run, add back and try the next procedure
258      rq.add(proc, false);
259    }
260    // no procedure is ready for execution, remove from run queue
261    removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
262    return null;
263  }
264
265  @Override
266  public List<LockedResource> getLocks() {
267    schedLock();
268    try {
269      return locking.getLocks();
270    } finally {
271      schedUnlock();
272    }
273  }
274
275  @Override
276  public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
277    schedLock();
278    try {
279      return locking.getLockResource(resourceType, resourceName);
280    } finally {
281      schedUnlock();
282    }
283  }
284
285  @Override
286  public void clear() {
287    schedLock();
288    try {
289      clearQueue();
290      locking.clear();
291    } finally {
292      schedUnlock();
293    }
294  }
295
296  private void clearQueue() {
297    // Remove Servers
298    for (int i = 0; i < serverBuckets.length; ++i) {
299      clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
300      serverBuckets[i] = null;
301    }
302
303    // Remove Tables
304    clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
305    tableMap = null;
306    tableProcsWaitingEnqueue.clear();
307
308    // Remove Peers
309    clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
310    peerMap = null;
311
312    // Remove Meta
313    clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
314    metaMap = null;
315
316    // Remove Global
317    clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
318    globalMap = null;
319
320    assert size() == 0 : "expected queue size to be 0, got " + size();
321  }
322
323  private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
324    FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
325    while (treeMap != null) {
326      Queue<T> node = AvlTree.getFirst(treeMap);
327      treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
328      if (fairq != null) {
329        removeFromRunQueue(fairq, node, () -> "clear all queues");
330      }
331    }
332  }
333
334  private int queueSize(Queue<?> head) {
335    int count = 0;
336    AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
337    while (iter.hasNext()) {
338      count += iter.next().size();
339    }
340    return count;
341  }
342
343  @Override
344  protected int queueSize() {
345    int count = 0;
346    for (ServerQueue serverMap : serverBuckets) {
347      count += queueSize(serverMap);
348    }
349    count += queueSize(tableMap);
350    count += queueSize(peerMap);
351    count += queueSize(metaMap);
352    count += queueSize(globalMap);
353    for (TableProcedureWaitingQueue waitingQ : tableProcsWaitingEnqueue.values()) {
354      count += waitingQ.waitingSize();
355    }
356    return count;
357  }
358
359  @Override
360  public void completionCleanup(final Procedure proc) {
361    if (isTableProcedure(proc)) {
362      TableProcedureInterface tableProc = (TableProcedureInterface) proc;
363      if (shouldWaitBeforeEnqueuing(tableProc)) {
364        schedLock();
365        try {
366          TableProcedureWaitingQueue waitingQueue =
367            tableProcsWaitingEnqueue.get(tableProc.getTableName());
368          if (waitingQueue != null) {
369            Optional<Procedure<?>> nextProc = waitingQueue.procedureCompleted(proc);
370            if (nextProc.isPresent()) {
371              // enqueue it
372              Procedure<?> next = nextProc.get();
373              LOG.debug("{} completed, enqueue a new procedure {}", proc, next);
374              doAdd(tableRunQueue, getTableQueue(tableProc.getTableName()), next, false);
375            } else {
376              if (waitingQueue.isEmpty()) {
377                // there is no waiting procedures in it, remove
378                tableProcsWaitingEnqueue.remove(tableProc.getTableName());
379              }
380            }
381          } else {
382            // this should not happen normally, warn it
383            LOG.warn("no waiting queue while completing {}, which should not happen", proc);
384          }
385        } finally {
386          schedUnlock();
387        }
388      }
389      boolean tableDeleted;
390      if (proc.hasException()) {
391        Exception procEx = proc.getException().unwrapRemoteException();
392        if (tableProc.getTableOperationType() == TableOperationType.CREATE) {
393          // create failed because the table already exist
394          tableDeleted = !(procEx instanceof TableExistsException);
395        } else {
396          // the operation failed because the table does not exist
397          tableDeleted = (procEx instanceof TableNotFoundException);
398        }
399      } else {
400        // the table was deleted
401        tableDeleted = (tableProc.getTableOperationType() == TableOperationType.DELETE);
402      }
403      if (tableDeleted) {
404        markTableAsDeleted(tableProc.getTableName(), proc);
405      }
406    } else if (proc instanceof PeerProcedureInterface) {
407      tryCleanupPeerQueue(getPeerId(proc), proc);
408    } else if (proc instanceof ServerProcedureInterface) {
409      tryCleanupServerQueue(getServerName(proc), proc);
410    } else {
411      // No cleanup for other procedure types, yet.
412      return;
413    }
414  }
415
416  private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
417    Supplier<String> reason) {
418    if (LOG.isTraceEnabled()) {
419      LOG.trace("Add {} to run queue because: {}", queue, reason.get());
420    }
421    if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
422      fairq.add(queue);
423    }
424  }
425
426  private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
427    Queue<T> queue, Supplier<String> reason) {
428    if (LOG.isTraceEnabled()) {
429      LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
430    }
431    if (AvlIterableList.isLinked(queue)) {
432      fairq.remove(queue);
433    }
434  }
435
436  // ============================================================================
437  // Table Queue Lookup Helpers
438  // ============================================================================
439  private TableQueue getTableQueue(TableName tableName) {
440    TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
441    if (node != null) return node;
442
443    node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
444      locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
445    tableMap = AvlTree.insert(tableMap, node);
446    return node;
447  }
448
449  private void removeTableQueue(TableName tableName) {
450    tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
451    locking.removeTableLock(tableName);
452  }
453
454  private static boolean isTableProcedure(Procedure<?> proc) {
455    return proc instanceof TableProcedureInterface;
456  }
457
458  private static TableName getTableName(Procedure<?> proc) {
459    return ((TableProcedureInterface) proc).getTableName();
460  }
461
462  // ============================================================================
463  // Server Queue Lookup Helpers
464  // ============================================================================
465  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
466    final int index = getBucketIndex(serverBuckets, serverName.hashCode());
467    ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
468    if (node != null) {
469      return node;
470    }
471    int priority;
472    if (proc != null) {
473      priority = MasterProcedureUtil.getServerPriority(proc);
474    } else {
475      priority = 1;
476    }
477    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
478    serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
479    return node;
480  }
481
482  private void removeServerQueue(ServerName serverName) {
483    int index = getBucketIndex(serverBuckets, serverName.hashCode());
484    serverBuckets[index] =
485      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
486    locking.removeServerLock(serverName);
487  }
488
489  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
490    schedLock();
491    try {
492      int index = getBucketIndex(serverBuckets, serverName.hashCode());
493      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
494      if (node == null) {
495        return;
496      }
497
498      LockAndQueue lock = locking.getServerLock(serverName);
499      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
500        removeFromRunQueue(serverRunQueue, node,
501          () -> "clean up server queue after " + proc + " completed");
502        removeServerQueue(serverName);
503      }
504    } finally {
505      schedUnlock();
506    }
507  }
508
509  private static int getBucketIndex(Object[] buckets, int hashCode) {
510    return Math.abs(hashCode) % buckets.length;
511  }
512
513  private static boolean isServerProcedure(Procedure<?> proc) {
514    return proc instanceof ServerProcedureInterface;
515  }
516
517  private static ServerName getServerName(Procedure<?> proc) {
518    return ((ServerProcedureInterface) proc).getServerName();
519  }
520
521  // ============================================================================
522  // Peer Queue Lookup Helpers
523  // ============================================================================
524  private PeerQueue getPeerQueue(String peerId) {
525    PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
526    if (node != null) {
527      return node;
528    }
529    node = new PeerQueue(peerId, locking.getPeerLock(peerId));
530    peerMap = AvlTree.insert(peerMap, node);
531    return node;
532  }
533
534  private void removePeerQueue(String peerId) {
535    peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
536    locking.removePeerLock(peerId);
537  }
538
539  private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
540    schedLock();
541    try {
542      PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
543      if (queue == null) {
544        return;
545      }
546
547      final LockAndQueue lock = locking.getPeerLock(peerId);
548      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
549        removeFromRunQueue(peerRunQueue, queue,
550          () -> "clean up peer queue after " + procedure + " completed");
551        removePeerQueue(peerId);
552      }
553    } finally {
554      schedUnlock();
555    }
556  }
557
558  private static boolean isPeerProcedure(Procedure<?> proc) {
559    return proc instanceof PeerProcedureInterface;
560  }
561
562  private static String getPeerId(Procedure<?> proc) {
563    return ((PeerProcedureInterface) proc).getPeerId();
564  }
565
566  // ============================================================================
567  // Meta Queue Lookup Helpers
568  // ============================================================================
569  private MetaQueue getMetaQueue() {
570    MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
571    if (node != null) {
572      return node;
573    }
574    node = new MetaQueue(locking.getMetaLock());
575    metaMap = AvlTree.insert(metaMap, node);
576    return node;
577  }
578
579  private static boolean isMetaProcedure(Procedure<?> proc) {
580    return proc instanceof MetaProcedureInterface;
581  }
582
583  // ============================================================================
584  // Global Queue Lookup Helpers
585  // ============================================================================
586  private GlobalQueue getGlobalQueue(String globalId) {
587    GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
588    if (node != null) {
589      return node;
590    }
591    node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
592    globalMap = AvlTree.insert(globalMap, node);
593    return node;
594  }
595
596  private void removeGlobalQueue(String globalId) {
597    globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
598    locking.removeGlobalLock(globalId);
599  }
600
601  private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
602    schedLock();
603    try {
604      GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
605      if (queue == null) {
606        return;
607      }
608
609      final LockAndQueue lock = locking.getGlobalLock(globalId);
610      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
611        removeFromRunQueue(globalRunQueue, queue,
612          () -> "clean up global queue after " + procedure + " completed");
613        removeGlobalQueue(globalId);
614      }
615    } finally {
616      schedUnlock();
617    }
618  }
619
620  private static boolean isGlobalProcedure(Procedure<?> proc) {
621    return proc instanceof GlobalProcedureInterface;
622  }
623
624  private static String getGlobalId(Procedure<?> proc) {
625    return ((GlobalProcedureInterface) proc).getGlobalId();
626  }
627
628  // ============================================================================
629  // Table Locking Helpers
630  // ============================================================================
631  /**
632   * Get lock info for a resource of specified type and name and log details
633   */
634  private void logLockedResource(LockedResourceType resourceType, String resourceName) {
635    if (!LOG.isDebugEnabled()) {
636      return;
637    }
638
639    LockedResource lockedResource = getLockResource(resourceType, resourceName);
640    if (lockedResource != null) {
641      String msg = resourceType.toString() + " '" + resourceName + "', shared lock count="
642        + lockedResource.getSharedLockCount();
643
644      Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
645      if (proc != null) {
646        msg += ", exclusively locked by procId=" + proc.getProcId();
647      }
648      LOG.debug(msg);
649    }
650  }
651
652  /**
653   * Suspend the procedure if the specified table is already locked. Other operations in the
654   * table-queue will be executed after the lock is released.
655   * @param procedure the procedure trying to acquire the lock
656   * @param table     Table to lock
657   * @return true if the procedure has to wait for the table to be available
658   */
659  public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
660    schedLock();
661    try {
662      final String namespace = table.getNamespaceAsString();
663      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
664      final LockAndQueue tableLock = locking.getTableLock(table);
665      if (!namespaceLock.trySharedLock(procedure)) {
666        waitProcedure(namespaceLock, procedure);
667        logLockedResource(LockedResourceType.NAMESPACE, namespace);
668        return true;
669      }
670      if (!tableLock.tryExclusiveLock(procedure)) {
671        namespaceLock.releaseSharedLock();
672        waitProcedure(tableLock, procedure);
673        logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
674        return true;
675      }
676      removeFromRunQueue(tableRunQueue, getTableQueue(table),
677        () -> procedure + " held the exclusive lock");
678      return false;
679    } finally {
680      schedUnlock();
681    }
682  }
683
684  /**
685   * Wake the procedures waiting for the specified table
686   * @param procedure the procedure releasing the lock
687   * @param table     the name of the table that has the exclusive lock
688   */
689  public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
690    schedLock();
691    try {
692      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
693      final LockAndQueue tableLock = locking.getTableLock(table);
694      int waitingCount = 0;
695      if (tableLock.releaseExclusiveLock(procedure)) {
696        waitingCount += wakeWaitingProcedures(tableLock);
697      }
698      if (namespaceLock.releaseSharedLock()) {
699        waitingCount += wakeWaitingProcedures(namespaceLock);
700      }
701      addToRunQueue(tableRunQueue, getTableQueue(table),
702        () -> procedure + " released the exclusive lock");
703      wakePollIfNeeded(waitingCount);
704    } finally {
705      schedUnlock();
706    }
707  }
708
709  /**
710   * Suspend the procedure if the specified table is already locked. other "read" operations in the
711   * table-queue may be executed concurrently,
712   * @param procedure the procedure trying to acquire the lock
713   * @param table     Table to lock
714   * @return true if the procedure has to wait for the table to be available
715   */
716  public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
717    return waitTableQueueSharedLock(procedure, table) == null;
718  }
719
720  private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
721    schedLock();
722    try {
723      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
724      final LockAndQueue tableLock = locking.getTableLock(table);
725      if (!namespaceLock.trySharedLock(procedure)) {
726        waitProcedure(namespaceLock, procedure);
727        return null;
728      }
729
730      if (!tableLock.trySharedLock(procedure)) {
731        namespaceLock.releaseSharedLock();
732        waitProcedure(tableLock, procedure);
733        return null;
734      }
735
736      return getTableQueue(table);
737    } finally {
738      schedUnlock();
739    }
740  }
741
742  /**
743   * Wake the procedures waiting for the specified table
744   * @param procedure the procedure releasing the lock
745   * @param table     the name of the table that has the shared lock
746   */
747  public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
748    schedLock();
749    try {
750      final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
751      final LockAndQueue tableLock = locking.getTableLock(table);
752      int waitingCount = 0;
753      if (tableLock.releaseSharedLock()) {
754        addToRunQueue(tableRunQueue, getTableQueue(table),
755          () -> procedure + " released the shared lock");
756        waitingCount += wakeWaitingProcedures(tableLock);
757      }
758      if (namespaceLock.releaseSharedLock()) {
759        waitingCount += wakeWaitingProcedures(namespaceLock);
760      }
761      wakePollIfNeeded(waitingCount);
762    } finally {
763      schedUnlock();
764    }
765  }
766
767  /**
768   * Tries to remove the queue and the table-lock of the specified table. If there are new
769   * operations pending (e.g. a new create), the remove will not be performed.
770   * @param table     the name of the table that should be marked as deleted
771   * @param procedure the procedure that is removing the table
772   * @return true if deletion succeeded, false otherwise meaning that there are other new operations
773   *         pending for that table (e.g. a new create).
774   */
775  boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
776    schedLock();
777    try {
778      final TableQueue queue = getTableQueue(table);
779      final LockAndQueue tableLock = locking.getTableLock(table);
780      if (queue == null) {
781        return true;
782      }
783
784      if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
785        // remove the table from the run-queue and the map
786        if (AvlIterableList.isLinked(queue)) {
787          tableRunQueue.remove(queue);
788        }
789        removeTableQueue(table);
790      } else {
791        // TODO: If there are no create, we can drop all the other ops
792        return false;
793      }
794    } finally {
795      schedUnlock();
796    }
797    return true;
798  }
799
800  // ============================================================================
801  // Region Locking Helpers
802  // ============================================================================
803  /**
804   * Suspend the procedure if the specified region is already locked.
805   * @param procedure  the procedure trying to acquire the lock on the region
806   * @param regionInfo the region we are trying to lock
807   * @return true if the procedure has to wait for the regions to be available
808   */
809  public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
810    return waitRegions(procedure, regionInfo.getTable(), regionInfo);
811  }
812
813  /**
814   * Suspend the procedure if the specified set of regions are already locked.
815   * @param procedure   the procedure trying to acquire the lock on the regions
816   * @param table       the table name of the regions we are trying to lock
817   * @param regionInfos the list of regions we are trying to lock
818   * @return true if the procedure has to wait for the regions to be available
819   */
820  public boolean waitRegions(final Procedure<?> procedure, final TableName table,
821    final RegionInfo... regionInfos) {
822    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
823    schedLock();
824    try {
825      assert table != null;
826      if (waitTableSharedLock(procedure, table)) {
827        return true;
828      }
829
830      // acquire region xlocks or wait
831      boolean hasLock = true;
832      final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
833      for (int i = 0; i < regionInfos.length; ++i) {
834        assert regionInfos[i] != null;
835        assert regionInfos[i].getTable() != null;
836        assert regionInfos[i].getTable().equals(table) : regionInfos[i] + " " + procedure;
837        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
838          : "duplicate region: " + regionInfos[i];
839
840        regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
841        if (!regionLocks[i].tryExclusiveLock(procedure)) {
842          LOG.info("Waiting on xlock for {} held by pid={}", procedure,
843            regionLocks[i].getExclusiveLockProcIdOwner());
844          waitProcedure(regionLocks[i], procedure);
845          hasLock = false;
846          while (i-- > 0) {
847            regionLocks[i].releaseExclusiveLock(procedure);
848          }
849          break;
850        } else {
851          LOG.info("Took xlock for {}", procedure);
852        }
853      }
854
855      if (!hasLock) {
856        wakeTableSharedLock(procedure, table);
857      }
858      return !hasLock;
859    } finally {
860      schedUnlock();
861    }
862  }
863
864  /**
865   * Wake the procedures waiting for the specified region
866   * @param procedure  the procedure that was holding the region
867   * @param regionInfo the region the procedure was holding
868   */
869  public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
870    wakeRegions(procedure, regionInfo.getTable(), regionInfo);
871  }
872
873  /**
874   * Wake the procedures waiting for the specified regions
875   * @param procedure   the procedure that was holding the regions
876   * @param regionInfos the list of regions the procedure was holding
877   */
878  public void wakeRegions(final Procedure<?> procedure, final TableName table,
879    final RegionInfo... regionInfos) {
880    Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
881    schedLock();
882    try {
883      int numProcs = 0;
884      final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
885      for (int i = 0; i < regionInfos.length; ++i) {
886        assert regionInfos[i].getTable().equals(table);
887        assert i == 0 || regionInfos[i] != regionInfos[i - 1]
888          : "duplicate region: " + regionInfos[i];
889
890        LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
891        if (regionLock.releaseExclusiveLock(procedure)) {
892          if (!regionLock.isWaitingQueueEmpty()) {
893            // release one procedure at the time since regions has an xlock
894            nextProcs[numProcs++] = regionLock.removeFirst();
895          } else {
896            locking.removeRegionLock(regionInfos[i].getEncodedName());
897          }
898        }
899      }
900
901      // awake procedures if any
902      for (int i = numProcs - 1; i >= 0; --i) {
903        wakeProcedure(nextProcs[i]);
904      }
905      wakePollIfNeeded(numProcs);
906      // release the table shared-lock.
907      wakeTableSharedLock(procedure, table);
908    } finally {
909      schedUnlock();
910    }
911  }
912
913  // ============================================================================
914  // Namespace Locking Helpers
915  // ============================================================================
916  /**
917   * Suspend the procedure if the specified namespace is already locked.
918   * @see #wakeNamespaceExclusiveLock(Procedure,String)
919   * @param procedure the procedure trying to acquire the lock
920   * @param namespace Namespace to lock
921   * @return true if the procedure has to wait for the namespace to be available
922   */
923  public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
924    schedLock();
925    try {
926      final LockAndQueue systemNamespaceTableLock =
927        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
928      if (!systemNamespaceTableLock.trySharedLock(procedure)) {
929        waitProcedure(systemNamespaceTableLock, procedure);
930        logLockedResource(LockedResourceType.TABLE,
931          TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString());
932        return true;
933      }
934
935      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
936      if (!namespaceLock.tryExclusiveLock(procedure)) {
937        systemNamespaceTableLock.releaseSharedLock();
938        waitProcedure(namespaceLock, procedure);
939        logLockedResource(LockedResourceType.NAMESPACE, namespace);
940        return true;
941      }
942      return false;
943    } finally {
944      schedUnlock();
945    }
946  }
947
948  /**
949   * Wake the procedures waiting for the specified namespace
950   * @see #waitNamespaceExclusiveLock(Procedure,String)
951   * @param procedure the procedure releasing the lock
952   * @param namespace the namespace that has the exclusive lock
953   */
954  public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
955    schedLock();
956    try {
957      final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
958      final LockAndQueue systemNamespaceTableLock =
959        locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
960      int waitingCount = 0;
961      if (namespaceLock.releaseExclusiveLock(procedure)) {
962        waitingCount += wakeWaitingProcedures(namespaceLock);
963      }
964      if (systemNamespaceTableLock.releaseSharedLock()) {
965        addToRunQueue(tableRunQueue,
966          getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME),
967          () -> procedure + " released namespace exclusive lock");
968        waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
969      }
970      wakePollIfNeeded(waitingCount);
971    } finally {
972      schedUnlock();
973    }
974  }
975
976  // ============================================================================
977  // Server Locking Helpers
978  // ============================================================================
979  /**
980   * Try to acquire the exclusive lock on the specified server.
981   * @see #wakeServerExclusiveLock(Procedure,ServerName)
982   * @param procedure  the procedure trying to acquire the lock
983   * @param serverName Server to lock
984   * @return true if the procedure has to wait for the server to be available
985   */
986  public boolean waitServerExclusiveLock(final Procedure<?> procedure,
987    final ServerName serverName) {
988    schedLock();
989    try {
990      final LockAndQueue lock = locking.getServerLock(serverName);
991      if (lock.tryExclusiveLock(procedure)) {
992        // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
993        // so.
994        removeFromRunQueue(serverRunQueue,
995          getServerQueue(serverName,
996            procedure instanceof ServerProcedureInterface
997              ? (ServerProcedureInterface) procedure
998              : null),
999          () -> procedure + " held exclusive lock");
1000        return false;
1001      }
1002      waitProcedure(lock, procedure);
1003      logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
1004      return true;
1005    } finally {
1006      schedUnlock();
1007    }
1008  }
1009
1010  /**
1011   * Wake the procedures waiting for the specified server
1012   * @see #waitServerExclusiveLock(Procedure,ServerName)
1013   * @param procedure  the procedure releasing the lock
1014   * @param serverName the server that has the exclusive lock
1015   */
1016  public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
1017    schedLock();
1018    try {
1019      final LockAndQueue lock = locking.getServerLock(serverName);
1020      // Only SCP will acquire/release server lock so do not need to check the return value here.
1021      lock.releaseExclusiveLock(procedure);
1022      // In tests we may pass procedures other than ServerProcedureInterface, just pass null if
1023      // so.
1024      addToRunQueue(serverRunQueue,
1025        getServerQueue(serverName,
1026          procedure instanceof ServerProcedureInterface
1027            ? (ServerProcedureInterface) procedure
1028            : null),
1029        () -> procedure + " released exclusive lock");
1030      int waitingCount = wakeWaitingProcedures(lock);
1031      wakePollIfNeeded(waitingCount);
1032    } finally {
1033      schedUnlock();
1034    }
1035  }
1036
1037  // ============================================================================
1038  // Peer Locking Helpers
1039  // ============================================================================
1040  /**
1041   * Try to acquire the exclusive lock on the specified peer.
1042   * @see #wakePeerExclusiveLock(Procedure, String)
1043   * @param procedure the procedure trying to acquire the lock
1044   * @param peerId    peer to lock
1045   * @return true if the procedure has to wait for the peer to be available
1046   */
1047  public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
1048    schedLock();
1049    try {
1050      final LockAndQueue lock = locking.getPeerLock(peerId);
1051      if (lock.tryExclusiveLock(procedure)) {
1052        removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
1053          () -> procedure + " held exclusive lock");
1054        return false;
1055      }
1056      waitProcedure(lock, procedure);
1057      logLockedResource(LockedResourceType.PEER, peerId);
1058      return true;
1059    } finally {
1060      schedUnlock();
1061    }
1062  }
1063
1064  /**
1065   * Wake the procedures waiting for the specified peer
1066   * @see #waitPeerExclusiveLock(Procedure, String)
1067   * @param procedure the procedure releasing the lock
1068   * @param peerId    the peer that has the exclusive lock
1069   */
1070  public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
1071    schedLock();
1072    try {
1073      final LockAndQueue lock = locking.getPeerLock(peerId);
1074      if (lock.releaseExclusiveLock(procedure)) {
1075        addToRunQueue(peerRunQueue, getPeerQueue(peerId),
1076          () -> procedure + " released exclusive lock");
1077        int waitingCount = wakeWaitingProcedures(lock);
1078        wakePollIfNeeded(waitingCount);
1079      }
1080    } finally {
1081      schedUnlock();
1082    }
1083  }
1084
1085  // ============================================================================
1086  // Meta Locking Helpers
1087  // ============================================================================
1088  /**
1089   * Try to acquire the exclusive lock on meta.
1090   * @see #wakeMetaExclusiveLock(Procedure)
1091   * @param procedure the procedure trying to acquire the lock
1092   * @return true if the procedure has to wait for meta to be available
1093   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1094   *             {@link RecoverMetaProcedure}.
1095   */
1096  @Deprecated
1097  public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
1098    schedLock();
1099    try {
1100      final LockAndQueue lock = locking.getMetaLock();
1101      if (lock.tryExclusiveLock(procedure)) {
1102        removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
1103        return false;
1104      }
1105      waitProcedure(lock, procedure);
1106      logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
1107      return true;
1108    } finally {
1109      schedUnlock();
1110    }
1111  }
1112
1113  /**
1114   * Wake the procedures waiting for meta.
1115   * @see #waitMetaExclusiveLock(Procedure)
1116   * @param procedure the procedure releasing the lock
1117   * @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
1118   *             {@link RecoverMetaProcedure}.
1119   */
1120  @Deprecated
1121  public void wakeMetaExclusiveLock(Procedure<?> procedure) {
1122    schedLock();
1123    try {
1124      final LockAndQueue lock = locking.getMetaLock();
1125      lock.releaseExclusiveLock(procedure);
1126      addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
1127      int waitingCount = wakeWaitingProcedures(lock);
1128      wakePollIfNeeded(waitingCount);
1129    } finally {
1130      schedUnlock();
1131    }
1132  }
1133
1134  // ============================================================================
1135  // Global Locking Helpers
1136  // ============================================================================
1137  /**
1138   * Try to acquire the share lock on global.
1139   * @see #wakeGlobalExclusiveLock(Procedure, String)
1140   * @param procedure the procedure trying to acquire the lock
1141   * @return true if the procedure has to wait for global to be available
1142   */
1143  public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1144    schedLock();
1145    try {
1146      final LockAndQueue lock = locking.getGlobalLock(globalId);
1147      if (lock.tryExclusiveLock(procedure)) {
1148        removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
1149          () -> procedure + " held shared lock");
1150        return false;
1151      }
1152      waitProcedure(lock, procedure);
1153      logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
1154      return true;
1155    } finally {
1156      schedUnlock();
1157    }
1158  }
1159
1160  /**
1161   * Wake the procedures waiting for global.
1162   * @see #waitGlobalExclusiveLock(Procedure, String)
1163   * @param procedure the procedure releasing the lock
1164   */
1165  public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
1166    schedLock();
1167    try {
1168      final LockAndQueue lock = locking.getGlobalLock(globalId);
1169      lock.releaseExclusiveLock(procedure);
1170      addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
1171        () -> procedure + " released shared lock");
1172      int waitingCount = wakeWaitingProcedures(lock);
1173      wakePollIfNeeded(waitingCount);
1174    } finally {
1175      schedUnlock();
1176    }
1177  }
1178
1179  /**
1180   * For debugging. Expensive.
1181   */
1182  public String dumpLocks() throws IOException {
1183    schedLock();
1184    try {
1185      // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
1186      return this.locking.toString();
1187    } finally {
1188      schedUnlock();
1189    }
1190  }
1191
1192  private void serverBucketToString(ToStringBuilder builder, String queueName, Queue<?> queue) {
1193    int size = queueSize(queue);
1194    if (size != 0) {
1195      builder.append(queueName, queue);
1196    }
1197  }
1198
1199  @Override
1200  public String toString() {
1201    ToStringBuilder builder =
1202      new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(super.toString());
1203    schedLock();
1204    try {
1205      for (int i = 0; i < serverBuckets.length; i++) {
1206        serverBucketToString(builder, "serverBuckets[" + i + "]", serverBuckets[i]);
1207      }
1208      builder.append("tableMap", tableMap);
1209      builder.append("tableWaitingMap", tableProcsWaitingEnqueue);
1210      builder.append("peerMap", peerMap);
1211      builder.append("metaMap", metaMap);
1212      builder.append("globalMap", globalMap);
1213    } finally {
1214      schedUnlock();
1215    }
1216    return builder.build();
1217  }
1218}