001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.concurrent.Callable; 022import java.util.concurrent.CountDownLatch; 023import org.apache.hadoop.hbase.errorhandling.ForeignException; 024import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 025import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener; 026import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; 027import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.zookeeper.KeeperException; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator 035 * which communicates with ProcedureMembers who create and start its part of the Procedure. This sub 036 * part is called a Subprocedure Users should subclass this and implement {@link #acquireBarrier()} 037 * (get local barrier for this member), {@link #insideBarrier()} (execute while globally barriered 038 * and release barrier) and {@link #cleanup(Exception)} (release state associated with 039 * subprocedure.) When submitted to a ProcedureMember, the call method is executed in a separate 040 * thread. Latches are use too block its progress and trigger continuations when barrier conditions 041 * are met. Exception that makes it out of calls to {@link #acquireBarrier()} or 042 * {@link #insideBarrier()} gets converted into {@link ForeignException}, which will get propagated 043 * to the {@link ProcedureCoordinator}. There is a category of procedure (ex: online-snapshots), and 044 * a user-specified instance-specific barrierName. (ex: snapshot121126). 045 */ 046@InterfaceAudience.Private 047abstract public class Subprocedure implements Callable<Void> { 048 private static final Logger LOG = LoggerFactory.getLogger(Subprocedure.class); 049 050 // Name of the procedure 051 final private String barrierName; 052 053 // 054 // Execution state 055 // 056 057 /** wait on before allowing the in barrier phase to proceed */ 058 private final CountDownLatch inGlobalBarrier; 059 /** counted down when the Subprocedure has completed */ 060 private final CountDownLatch releasedLocalBarrier; 061 062 // 063 // Error handling 064 // 065 /** monitor to check for errors */ 066 protected final ForeignExceptionDispatcher monitor; 067 /** frequency to check for errors (ms) */ 068 protected final long wakeFrequency; 069 protected final TimeoutExceptionInjector executionTimeoutTimer; 070 protected final ProcedureMemberRpcs rpcs; 071 072 private volatile boolean complete = false; 073 074 /** 075 * @param member reference to the member managing this subprocedure 076 * @param procName name of the procedure this subprocedure is associated with 077 * @param monitor notified if there is an error in the subprocedure 078 * @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in 079 * milliseconds). 080 * @param timeout time in millis that will trigger a subprocedure abort if it has not 081 * completed 082 */ 083 public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor, 084 long wakeFrequency, long timeout) { 085 // Asserts should be caught during unit testing 086 assert member != null : "procedure member should be non-null"; 087 assert member.getRpcs() != null : "rpc handlers should be non-null"; 088 assert procName != null : "procedure name should be non-null"; 089 assert monitor != null : "monitor should be non-null"; 090 091 // Default to a very large timeout 092 this.rpcs = member.getRpcs(); 093 this.barrierName = procName; 094 this.monitor = monitor; 095 // forward any failures to coordinator. Since this is a dispatcher, resend loops should not be 096 // possible. 097 this.monitor.addListener(new ForeignExceptionListener() { 098 @Override 099 public void receive(ForeignException ee) { 100 // if this is a notification from a remote source, just log 101 if (ee.isRemote()) { 102 LOG.debug("Was remote foreign exception, not redispatching error", ee); 103 return; 104 } 105 // if this is a local KeeperException, don't attempt to notify other members 106 if (ee.getCause() instanceof KeeperException) { 107 LOG.debug("Was KeeperException, not redispatching error", ee); 108 return; 109 } 110 // if it is other local error, then send it to the coordinator 111 try { 112 rpcs.sendMemberAborted(Subprocedure.this, ee); 113 } catch (IOException e) { 114 // this will fail all the running procedures, since the connection is down 115 LOG.error("Can't reach controller, not propagating error", e); 116 } 117 } 118 }); 119 120 this.wakeFrequency = wakeFrequency; 121 this.inGlobalBarrier = new CountDownLatch(1); 122 this.releasedLocalBarrier = new CountDownLatch(1); 123 124 // accept error from timer thread, this needs to be started. 125 this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout); 126 } 127 128 public String getName() { 129 return barrierName; 130 } 131 132 public String getMemberName() { 133 return rpcs.getMemberName(); 134 } 135 136 private void rethrowException() throws ForeignException { 137 monitor.rethrowException(); 138 } 139 140 /** 141 * Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods while 142 * keeping some state for other threads to access. This would normally be executed by the 143 * ProcedureMember when a acquire message comes from the coordinator. Rpcs are used to spend 144 * message back to the coordinator after different phases are executed. Any exceptions caught 145 * during the execution (except for InterruptedException) get converted and propagated to 146 * coordinator via {@link ProcedureMemberRpcs#sendMemberAborted( Subprocedure, ForeignException)}. 147 */ 148 @SuppressWarnings("finally") 149 @Override 150 final public Void call() { 151 LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " 152 + executionTimeoutTimer.getMaxTime() + "ms"); 153 // start the execution timeout timer 154 executionTimeoutTimer.start(); 155 156 try { 157 // start by checking for error first 158 rethrowException(); 159 LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage"); 160 acquireBarrier(); 161 LOG.debug("Subprocedure '" + barrierName + "' locally acquired"); 162 rethrowException(); 163 164 // vote yes to coordinator about being prepared 165 rpcs.sendMemberAcquired(this); 166 LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" 167 + " 'reached' or 'abort' from coordinator"); 168 169 // wait for the procedure to reach global barrier before proceding 170 waitForReachedGlobalBarrier(); 171 rethrowException(); // if Coordinator aborts, will bail from here with exception 172 173 // In traditional 2PC, if a member reaches this state the TX has been committed and the 174 // member is responsible for rolling forward and recovering and completing the subsequent 175 // operations in the case of failure. It cannot rollback. 176 // 177 // This implementation is not 2PC since it can still rollback here, and thus has different 178 // semantics. 179 180 LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); 181 byte[] dataToCoordinator = insideBarrier(); 182 LOG.debug("Subprocedure '" + barrierName + "' locally completed"); 183 rethrowException(); 184 185 // Ack that the member has executed and released local barrier 186 rpcs.sendMemberCompleted(this, dataToCoordinator); 187 LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion"); 188 189 // make sure we didn't get an external exception 190 rethrowException(); 191 } catch (Exception e) { 192 String msg = null; 193 if (e instanceof InterruptedException) { 194 msg = "Procedure '" + barrierName + "' aborting due to interrupt!" 195 + " Likely due to pool shutdown."; 196 Thread.currentThread().interrupt(); 197 } else if (e instanceof ForeignException) { 198 msg = "Subprocedure '" + barrierName + "' aborting due to a ForeignException!"; 199 } else { 200 msg = "Subprocedure '" + barrierName + "' failed!"; 201 } 202 cancel(msg, e); 203 204 LOG.debug("Subprocedure '" + barrierName + "' running cleanup."); 205 cleanup(e); 206 } finally { 207 releasedLocalBarrier.countDown(); 208 209 // tell the timer we are done, if we get here successfully 210 executionTimeoutTimer.complete(); 211 complete = true; 212 LOG.debug("Subprocedure '" + barrierName + "' completed."); 213 return null; 214 } 215 } 216 217 boolean isComplete() { 218 return complete; 219 } 220 221 /** 222 * exposed for testing. 223 */ 224 ForeignExceptionSnare getErrorCheckable() { 225 return this.monitor; 226 } 227 228 /** 229 * The implementation of this method should gather and hold required resources (locks, disk space, 230 * etc) to satisfy the Procedures barrier condition. For example, this would be where to make all 231 * the regions on a RS on the quiescent for an procedure that required all regions to be globally 232 * quiesed. Users should override this method. If a quiescent is not required, this is overkill 233 * but can still be used to execute a procedure on all members and to propagate any exceptions. 234 */ 235 abstract public void acquireBarrier() throws ForeignException; 236 237 /** 238 * The implementation of this method should act with the assumption that the barrier condition has 239 * been satisfied. Continuing the previous example, a condition could be that all RS's globally 240 * have been quiesced, and procedures that require this precondition could be implemented here. 241 * The implementation should also collect the result of the subprocedure as data to be returned to 242 * the coordinator upon successful completion. Users should override this method. 243 * @return the data the subprocedure wants to return to coordinator side. 244 */ 245 abstract public byte[] insideBarrier() throws ForeignException; 246 247 /** 248 * Users should override this method. This implementation of this method should rollback and 249 * cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have 250 * created. 251 */ 252 abstract public void cleanup(Exception e); 253 254 /** 255 * Method to cancel the Subprocedure by injecting an exception from and external source. 256 */ 257 public void cancel(String msg, Throwable cause) { 258 LOG.error(msg, cause); 259 complete = true; 260 if (cause instanceof ForeignException) { 261 monitor.receive((ForeignException) cause); 262 } else { 263 monitor.receive(new ForeignException(getMemberName(), cause)); 264 } 265 } 266 267 /** 268 * Callback for the member rpcs to call when the global barrier has been reached. This unblocks 269 * the main subprocedure exectuion thread so that the Subprocedure's {@link #insideBarrier()} 270 * method can be run. 271 */ 272 public void receiveReachedGlobalBarrier() { 273 inGlobalBarrier.countDown(); 274 } 275 276 // 277 // Subprocedure Internal State interface 278 // 279 280 /** 281 * Wait for the reached global barrier notification. Package visibility for testing 282 */ 283 void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException { 284 Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency, 285 barrierName + ":remote acquired"); 286 } 287 288 /** 289 * Waits until the entire procedure has globally completed, or has been aborted. 290 */ 291 public void waitForLocallyCompleted() throws ForeignException, InterruptedException { 292 Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency, 293 barrierName + ":completed"); 294 } 295 296 /** 297 * Empty Subprocedure for testing. Must be public for stubbing used in testing to work. 298 */ 299 public static class SubprocedureImpl extends Subprocedure { 300 301 public SubprocedureImpl(ProcedureMember member, String opName, 302 ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) { 303 super(member, opName, monitor, wakeFrequency, timeout); 304 } 305 306 @Override 307 public void acquireBarrier() throws ForeignException { 308 } 309 310 @Override 311 public byte[] insideBarrier() throws ForeignException { 312 return new byte[0]; 313 } 314 315 @Override 316 public void cleanup(Exception e) { 317 } 318 }; 319}