001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.RejectedExecutionException; 028import java.util.concurrent.SynchronousQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.hbase.errorhandling.ForeignException; 032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker; 039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 040 041/** 042 * This is the master side of a distributed complex procedure execution. 043 * <p> 044 * The {@link Procedure} is generic and subclassing or customization shouldn't be necessary -- any 045 * customization should happen just in {@link Subprocedure}s. 046 */ 047@InterfaceAudience.Private 048public class ProcedureCoordinator { 049 private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class); 050 051 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000; 052 final static long TIMEOUT_MILLIS_DEFAULT = 60000; 053 final static long WAKE_MILLIS_DEFAULT = 500; 054 055 private final ProcedureCoordinatorRpcs rpcs; 056 private final ExecutorService pool; 057 private final long wakeTimeMillis; 058 private final long timeoutMillis; 059 060 // Running procedure table. Maps procedure name to running procedure reference 061 private final ConcurrentMap<String, Procedure> procedures = 062 new MapMaker().concurrencyLevel(4).weakValues().makeMap(); 063 064 /** 065 * Create and start a ProcedureCoordinator. The rpc object registers the ProcedureCoordinator and 066 * starts any threads in this constructor. 067 * @param pool Used for executing procedures. 068 */ 069 // Only used in tests. SimpleMasterProcedureManager is a test class. 070 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) { 071 this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT); 072 } 073 074 /** 075 * Create and start a ProcedureCoordinator. The rpc object registers the ProcedureCoordinator and 076 * starts any threads in this constructor. 077 * @param pool Used for executing procedures. 078 */ 079 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, 080 long timeoutMillis, long wakeTimeMillis) { 081 this.timeoutMillis = timeoutMillis; 082 this.wakeTimeMillis = wakeTimeMillis; 083 this.rpcs = rpcs; 084 this.pool = pool; 085 this.rpcs.start(this); 086 } 087 088 /** 089 * Default thread pool for the procedure 090 * @param opThreads the maximum number of threads to allow in the pool 091 */ 092 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) { 093 return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT); 094 } 095 096 /** 097 * Default thread pool for the procedure 098 * @param opThreads the maximum number of threads to allow in the pool 099 * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks 100 */ 101 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, 102 long keepAliveMillis) { 103 return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, 104 new SynchronousQueue<>(), 105 new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d") 106 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 107 } 108 109 /** 110 * Shutdown the thread pools and release rpc resources 111 */ 112 public void close() throws IOException { 113 // have to use shutdown now to break any latch waiting 114 pool.shutdownNow(); 115 rpcs.close(); 116 } 117 118 /** 119 * Submit an procedure to kick off its dependent subprocedures. 120 * @param proc Procedure to execute 121 * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the procedure 122 * or any subprocedures could not be started. Failure could be due to submitting a 123 * procedure multiple times (or one with the same name), or some sort of IO problem. On 124 * errors, the procedure's monitor holds a reference to the exception that caused the 125 * failure. 126 */ 127 @SuppressWarnings("FutureReturnValueIgnored") 128 boolean submitProcedure(Procedure proc) { 129 // if the submitted procedure was null, then we don't want to run it 130 if (proc == null) { 131 return false; 132 } 133 String procName = proc.getName(); 134 135 // make sure we aren't already running a procedure of that name 136 Procedure oldProc = procedures.get(procName); 137 if (oldProc != null) { 138 // procedures are always eventually completed on both successful and failed execution 139 try { 140 if (!oldProc.isCompleted()) { 141 LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); 142 return false; 143 } else { 144 LOG.debug("Procedure " + procName 145 + " was in running list but was completed. Accepting new attempt."); 146 if (!procedures.remove(procName, oldProc)) { 147 LOG.warn("Procedure " + procName 148 + " has been resubmitted by another thread. Rejecting this request."); 149 return false; 150 } 151 } 152 } catch (ForeignException e) { 153 LOG.debug("Procedure " + procName 154 + " was in running list but has exception. Accepting new attempt."); 155 if (!procedures.remove(procName, oldProc)) { 156 LOG.warn("Procedure " + procName 157 + " has been resubmitted by another thread. Rejecting this request."); 158 return false; 159 } 160 } 161 } 162 163 // kick off the procedure's execution in a separate thread 164 try { 165 if (this.procedures.putIfAbsent(procName, proc) == null) { 166 LOG.debug("Submitting procedure " + procName); 167 this.pool.submit(proc); 168 return true; 169 } else { 170 LOG.error( 171 "Another thread has submitted procedure '" + procName + "'. Ignoring this attempt."); 172 return false; 173 } 174 } catch (RejectedExecutionException e) { 175 LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e); 176 // Remove the procedure from the list since is not started 177 this.procedures.remove(procName, proc); 178 // the thread pool is full and we can't run the procedure 179 proc.receive(new ForeignException(procName, e)); 180 } 181 return false; 182 } 183 184 /** 185 * The connection to the rest of the procedure group (members and coordinator) has been 186 * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other 187 * members since we cannot reach them anymore. 188 * @param message description of the error 189 * @param cause the actual cause of the failure 190 */ 191 void rpcConnectionFailure(final String message, final IOException cause) { 192 Collection<Procedure> toNotify = procedures.values(); 193 194 boolean isTraceEnabled = LOG.isTraceEnabled(); 195 LOG.debug("received connection failure: " + message, cause); 196 for (Procedure proc : toNotify) { 197 if (proc == null) { 198 continue; 199 } 200 // notify the elements, if they aren't null 201 if (isTraceEnabled) { 202 LOG.trace("connection failure - notify procedure: " + proc.getName()); 203 } 204 proc.receive(new ForeignException(proc.getName(), cause)); 205 } 206 } 207 208 /** 209 * Abort the procedure with the given name 210 * @param procName name of the procedure to abort 211 * @param reason serialized information about the abort 212 */ 213 public void abortProcedure(String procName, ForeignException reason) { 214 LOG.debug("abort procedure " + procName, reason); 215 // if we know about the Procedure, notify it 216 Procedure proc = procedures.get(procName); 217 if (proc == null) { 218 return; 219 } 220 proc.receive(reason); 221 } 222 223 /** 224 * Exposed for hooking with unit tests. 225 * @return the newly created procedure 226 */ 227 Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 228 List<String> expectedMembers) { 229 // build the procedure 230 return new Procedure(this, fed, wakeTimeMillis, timeoutMillis, procName, procArgs, 231 expectedMembers); 232 } 233 234 /** 235 * Kick off the named procedure Currently only one procedure with the same type and name is 236 * allowed to run at a time. 237 * @param procName name of the procedure to start 238 * @param procArgs arguments for the procedure 239 * @param expectedMembers expected members to start 240 * @return handle to the running procedure, if it was started correctly, <tt>null</tt> otherwise. 241 * Null could be due to submitting a procedure multiple times (or one with the same name), 242 * or runtime exception. Check the procedure's monitor that holds a reference to the 243 * exception that caused the failure. 244 */ 245 public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 246 List<String> expectedMembers) { 247 Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers); 248 if (!this.submitProcedure(proc)) { 249 LOG.error("Failed to submit procedure '" + procName + "'"); 250 return null; 251 } 252 return proc; 253 } 254 255 /** 256 * Notification that the procedure had the specified member acquired its part of the barrier via 257 * {@link Subprocedure#acquireBarrier()}. 258 * @param procName name of the procedure that acquired 259 * @param member name of the member that acquired 260 */ 261 void memberAcquiredBarrier(String procName, final String member) { 262 Procedure proc = procedures.get(procName); 263 if (proc == null) { 264 LOG.warn( 265 "Member '" + member + "' is trying to acquire an unknown procedure '" + procName + "'"); 266 return; 267 } 268 if (LOG.isTraceEnabled()) { 269 LOG.trace("Member '" + member + "' acquired procedure '" + procName + "'"); 270 } 271 proc.barrierAcquiredByMember(member); 272 } 273 274 /** 275 * Notification that the procedure had another member finished executing its in-barrier subproc 276 * via {@link Subprocedure#insideBarrier()}. 277 * @param procName name of the subprocedure that finished 278 * @param member name of the member that executed and released its barrier 279 * @param dataFromMember the data that the member returned along with the notification 280 */ 281 void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) { 282 Procedure proc = procedures.get(procName); 283 if (proc == null) { 284 LOG.warn( 285 "Member '" + member + "' is trying to release an unknown procedure '" + procName + "'"); 286 return; 287 } 288 if (LOG.isTraceEnabled()) { 289 LOG.trace("Member '" + member + "' released procedure '" + procName + "'"); 290 } 291 proc.barrierReleasedByMember(member, dataFromMember); 292 } 293 294 /** Returns the rpcs implementation for all current procedures */ 295 ProcedureCoordinatorRpcs getRpcs() { 296 return rpcs; 297 } 298 299 /** 300 * Returns the procedure. This Procedure is a live instance so should not be modified but can be 301 * inspected. 302 * @param name Name of the procedure 303 * @return Procedure or null if not present any more 304 */ 305 public Procedure getProcedure(String name) { 306 return procedures.get(name); 307 } 308 309 /** Returns Return set of all procedure names. */ 310 public Set<String> getProcedureNames() { 311 return new HashSet<>(procedures.keySet()); 312 } 313}