001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Optional; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.master.MasterFileSystem; 025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 026import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; 027import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 028import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; 029import org.apache.hadoop.hbase.procedure2.Procedure; 030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 032import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 035import org.apache.hadoop.hbase.replication.ReplicationQueueId; 036import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 038import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; 047 048@InterfaceAudience.Private 049public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure 050 implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { 051 052 private static final Logger LOG = 053 LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class); 054 055 private ReplicationQueueId queueId; 056 057 public ClaimReplicationQueueRemoteProcedure() { 058 } 059 060 public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerName targetServer) { 061 this.queueId = queueId; 062 this.targetServer = targetServer; 063 } 064 065 // check whether ReplicationSyncUp has already done the work for us, if so, we should skip 066 // claiming the replication queues and deleting them instead. 067 private boolean shouldSkip(MasterProcedureEnv env) throws IOException { 068 MasterFileSystem mfs = env.getMasterFileSystem(); 069 Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); 070 return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName())); 071 } 072 073 @Override 074 protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 075 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 076 try { 077 if (shouldSkip(env)) { 078 LOG.info("Skip claiming {} because replication sync up has already done it for us", 079 getServerName()); 080 return null; 081 } 082 } catch (IOException e) { 083 LOG.warn("failed to check whether we should skip claiming {} due to replication sync up", 084 getServerName(), e); 085 // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule 086 return null; 087 } 088 return super.execute(env); 089 } 090 091 @Override 092 public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) { 093 assert targetServer.equals(remote); 094 ClaimReplicationQueueRemoteParameter.Builder builder = ClaimReplicationQueueRemoteParameter 095 .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) 096 .setQueue(queueId.getPeerId()); 097 queueId.getSourceServerName() 098 .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); 099 return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class, 100 builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime())); 101 } 102 103 @Override 104 public ServerName getServerName() { 105 // return crashed server here, as we are going to recover its replication queues so we should 106 // use its scheduler queue instead of the one for the target server. 107 return queueId.getServerName(); 108 } 109 110 @Override 111 public boolean hasMetaTableRegion() { 112 return false; 113 } 114 115 @Override 116 public ServerOperationType getServerOperationType() { 117 return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE; 118 } 119 120 @Override 121 protected boolean complete(MasterProcedureEnv env, Throwable error) { 122 if (error != null) { 123 LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error); 124 return false; 125 } else { 126 return true; 127 } 128 } 129 130 @Override 131 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 132 throw new UnsupportedOperationException(); 133 } 134 135 @Override 136 protected boolean abort(MasterProcedureEnv env) { 137 return false; 138 } 139 140 @Override 141 protected boolean waitInitialized(MasterProcedureEnv env) { 142 return env.waitInitialized(this); 143 } 144 145 @Override 146 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 147 ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData 148 .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) 149 .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)) 150 .setState(state); 151 if (this.remoteError != null) { 152 ErrorHandlingProtos.ForeignExceptionMessage fem = 153 ForeignExceptionUtil.toProtoForeignException(remoteError); 154 builder.setError(fem); 155 } 156 queueId.getSourceServerName() 157 .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); 158 serializer.serialize(builder.build()); 159 } 160 161 @Override 162 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 163 ClaimReplicationQueueRemoteStateData data = 164 serializer.deserialize(ClaimReplicationQueueRemoteStateData.class); 165 targetServer = ProtobufUtil.toServerName(data.getTargetServer()); 166 ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); 167 String queue = data.getQueue(); 168 state = data.getState(); 169 if (data.hasSourceServer()) { 170 queueId = new ReplicationQueueId(crashedServer, queue, 171 ProtobufUtil.toServerName(data.getSourceServer())); 172 } else { 173 queueId = new ReplicationQueueId(crashedServer, queue); 174 } 175 if (data.hasError()) { 176 this.remoteError = ForeignExceptionUtil.toException(data.getError()); 177 } 178 } 179}