001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import org.apache.hadoop.hbase.ServerName; 022import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; 023import org.apache.hadoop.hbase.procedure2.Procedure; 024import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 025import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 026import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 027import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 028import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033@InterfaceAudience.Private 034/** 035 * The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote) 036 * RegionServer; e.g. asking a RegionServer to split a WAL file as a sub-procedure of the 037 * ServerCrashProcedure recovery process. 038 * <p> 039 * To implement a new Procedure type, extend this class and override remoteCallBuild() and 040 * complete(). The dispatch and callback will be handled for you here, internally. 041 * <p> 042 * The Procedure works as follows. It uses {@link RSProcedureDispatcher}, the same system used 043 * dispatching Region OPEN and CLOSE RPCs, to pass a Callable to a RegionServer. Examples include 044 * {@link org.apache.hadoop.hbase.regionserver.SplitWALCallable} and 045 * {@link org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable}. Rather than 046 * assign/unassign, the Master calls #executeProcedures against the remote RegionServer wrapping a 047 * Callable in a {@link ExecuteProceduresRequest}. Upon successful dispatch, the Procedure then 048 * suspends itself on the Master-side and relinqushes its executor worker. On receipt, the 049 * RegionServer submits the Callable to its executor service. When the Callable completes, it adds 050 * itself to a queue on the RegionServer side for processing by a background thread, the 051 * {@link RemoteProcedureResultReporter}. It picks up the completed Callable from the queue and RPCs 052 * the master at #reportProcedureDone with the procedure id and whether success or failure. The 053 * master calls complete() setting success or failure state and then reschedules the suspended 054 * Procedure so it can finish. 055 * <p> 056 * Here are some details on operation: 057 * <p> 058 * If adding the operation to the dispatcher fails, addOperationToNode will throw 059 * FailedRemoteDispatchException, and this Procedure will return 'null'. The Procedure Executor will 060 * then mark this procedure as 'complete' (though we failed to dispatch our task). In this case, the 061 * upper layer of this procedure must have a way to check if this Procedure really succeeded or not 062 * and have appropriate handling. 063 * <p> 064 * If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle 065 * this which calls remoteOperationDone with the exception. If the targetServer crashed but this 066 * procedure has no response, than dispatcher will call remoteOperationFailed() which also calls 067 * remoteOperationDone with the exception. If the operation is successful, then 068 * remoteOperationCompleted will be called and actually calls the remoteOperationDone without 069 * exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others. 070 * Then developer could implement complete() based on their own purpose. But basic logic is that if 071 * operation succeed, set succ to true and do the clean work. If operation failed and require to 072 * resend it to the same server, leave the succ as false. If operation failed and require to resend 073 * it to another server, set succ to true and upper layer should be able to find out this operation 074 * not work and send a operation to another server. 075 */ 076public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv> 077 implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> { 078 protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class); 079 protected ProcedureEvent<?> event; 080 protected ServerName targetServer; 081 protected boolean dispatched; 082 protected boolean succ; 083 084 protected abstract void complete(MasterProcedureEnv env, Throwable error); 085 086 @Override 087 protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 088 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 089 if (dispatched) { 090 if (succ) { 091 return null; 092 } 093 dispatched = false; 094 } 095 try { 096 env.getRemoteDispatcher().addOperationToNode(targetServer, this); 097 } catch (FailedRemoteDispatchException frde) { 098 LOG.warn("Can not send remote operation {} to {}, this operation will " 099 + "be retried to send to another server", this.getProcId(), targetServer); 100 return null; 101 } 102 dispatched = true; 103 event = new ProcedureEvent<>(this); 104 event.suspendIfNotReady(this); 105 throw new ProcedureSuspendedException(); 106 } 107 108 @Override 109 protected synchronized void completionCleanup(MasterProcedureEnv env) { 110 env.getRemoteDispatcher().removeCompletedOperation(targetServer, this); 111 } 112 113 @Override 114 public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, 115 IOException exception) { 116 remoteOperationDone(env, exception); 117 } 118 119 @Override 120 public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { 121 remoteOperationDone(env, null); 122 } 123 124 @Override 125 public synchronized void remoteOperationFailed(MasterProcedureEnv env, 126 RemoteProcedureException error) { 127 remoteOperationDone(env, error); 128 } 129 130 synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { 131 if (this.isFinished()) { 132 LOG.info("This procedure {} is already finished, skip the rest processes", this.getProcId()); 133 return; 134 } 135 if (event == null) { 136 LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", 137 getProcId()); 138 return; 139 } 140 complete(env, error); 141 event.wake(env.getProcedureScheduler()); 142 event = null; 143 } 144}