001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.function.Function; 021import java.util.function.Predicate; 022import java.util.stream.Stream; 023import org.apache.commons.lang3.builder.ToStringBuilder; 024import org.apache.commons.lang3.builder.ToStringStyle; 025import org.apache.yetus.audience.InterfaceAudience; 026 027/** 028 * Locking for mutual exclusion between procedures. Used only by procedure framework internally. 029 * {@link LockAndQueue} has two purposes: 030 * <ol> 031 * <li>Acquire/release exclusive/shared locks.</li> 032 * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends 033 * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance 034 * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way 035 * because in certain cases, there will be millions of regions. This layout uses less memory. 036 * </ol> 037 * <p/> 038 * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are 039 * guarded by schedLock(). <br/> 040 * There is no need of 'volatile' keyword for member variables because of memory synchronization 041 * guarantees of locks (see 042 * <a href="http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html">Memory 043 * Synchronization</a>) <br/> 044 * We do not implement Lock interface because we need exclusive and shared locking, and also because 045 * try-lock functions require procedure id. <br/> 046 * We do not use ReentrantReadWriteLock directly because of its high memory overhead. 047 */ 048@InterfaceAudience.Private 049public class LockAndQueue implements LockStatus { 050 051 private final Function<Long, Procedure<?>> procedureRetriever; 052 private final ProcedureDeque queue = new ProcedureDeque(); 053 private Procedure<?> exclusiveLockOwnerProcedure = null; 054 private int sharedLock = 0; 055 056 // ====================================================================== 057 // Lock Status 058 // ====================================================================== 059 060 public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) { 061 this.procedureRetriever = procedureRetriever; 062 } 063 064 @Override 065 public boolean hasExclusiveLock() { 066 return this.exclusiveLockOwnerProcedure != null; 067 } 068 069 @Override 070 public boolean hasLockAccess(Procedure<?> proc) { 071 if (exclusiveLockOwnerProcedure == null) { 072 return false; 073 } 074 long lockOwnerId = exclusiveLockOwnerProcedure.getProcId(); 075 if (proc.getProcId() == lockOwnerId) { 076 return true; 077 } 078 if (!proc.hasParent()) { 079 return false; 080 } 081 // fast path to check root procedure 082 if (proc.getRootProcId() == lockOwnerId) { 083 return true; 084 } 085 // check ancestors 086 for (Procedure<?> p = proc;;) { 087 if (p.getParentProcId() == lockOwnerId) { 088 return true; 089 } 090 p = procedureRetriever.apply(p.getParentProcId()); 091 if (p == null || !p.hasParent()) { 092 return false; 093 } 094 } 095 } 096 097 @Override 098 public Procedure<?> getExclusiveLockOwnerProcedure() { 099 return exclusiveLockOwnerProcedure; 100 } 101 102 @Override 103 public int getSharedLockCount() { 104 return sharedLock; 105 } 106 107 // ====================================================================== 108 // try/release Shared/Exclusive lock 109 // ====================================================================== 110 111 /** Returns whether we have succesfully acquired the shared lock. */ 112 public boolean trySharedLock(Procedure<?> proc) { 113 if (hasExclusiveLock() && !hasLockAccess(proc)) { 114 return false; 115 } 116 // If no one holds the xlock, then we are free to hold the sharedLock 117 // If the parent proc or we have already held the xlock, then we return true here as 118 // xlock is more powerful then shared lock. 119 sharedLock++; 120 return true; 121 } 122 123 /** Returns whether we should wake the procedures waiting on the lock here. */ 124 public boolean releaseSharedLock() { 125 // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our 126 // parent have held the xlock. And since there is still an exclusive lock, we do not need to 127 // wake any procedures. 128 return --sharedLock == 0 && !hasExclusiveLock(); 129 } 130 131 public boolean tryExclusiveLock(Procedure<?> proc) { 132 if (isLocked()) { 133 return hasLockAccess(proc); 134 } 135 exclusiveLockOwnerProcedure = proc; 136 return true; 137 } 138 139 /** Returns whether we should wake the procedures waiting on the lock here. */ 140 public boolean releaseExclusiveLock(Procedure<?> proc) { 141 if ( 142 exclusiveLockOwnerProcedure == null 143 || exclusiveLockOwnerProcedure.getProcId() != proc.getProcId() 144 ) { 145 // We are not the lock owner, it is probably inherited from the parent procedures. 146 return false; 147 } 148 exclusiveLockOwnerProcedure = null; 149 // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent 150 // proc or we have already held the xlock, and also allow releasing the locks in any order, so 151 // it could happen that the xlock is released but there are still some procs holding the shared 152 // lock. 153 // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which 154 // acquire the shared lock on the same lock. This is because we will schedule the sub proces 155 // before releasing the lock, so the sub procs could call acquire lock before we releasing the 156 // xlock. 157 return sharedLock == 0; 158 } 159 160 public boolean isWaitingQueueEmpty() { 161 return queue.isEmpty(); 162 } 163 164 public Procedure<?> removeFirst() { 165 return queue.removeFirst(); 166 } 167 168 public void addLast(Procedure<?> proc) { 169 queue.addLast(proc); 170 } 171 172 public int wakeWaitingProcedures(ProcedureScheduler scheduler) { 173 int count = queue.size(); 174 // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so 175 // that the procedure which was added first goes in the front for the scheduler queue. 176 scheduler.addFront(queue.descendingIterator()); 177 queue.clear(); 178 return count; 179 } 180 181 @SuppressWarnings("rawtypes") 182 public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) { 183 return queue.stream().filter(predicate); 184 } 185 186 @Override 187 public String toString() { 188 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) 189 .appendSuper(describeLockStatus()).append("waitingProcCount", queue.size()).build(); 190 } 191}