001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Optional; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 026import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; 027import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; 028import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; 029import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 030import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 031import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; 032import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; 041 042/** 043 * A remote procedure which is used to send replaying remote wal work to region server. 044 */ 045@InterfaceAudience.Private 046public class SyncReplicationReplayWALRemoteProcedure extends ServerRemoteProcedure 047 implements PeerProcedureInterface { 048 049 private static final Logger LOG = 050 LoggerFactory.getLogger(SyncReplicationReplayWALRemoteProcedure.class); 051 052 private String peerId; 053 054 private List<String> wals; 055 056 public SyncReplicationReplayWALRemoteProcedure() { 057 } 058 059 public SyncReplicationReplayWALRemoteProcedure(String peerId, List<String> wals, 060 ServerName targetServer) { 061 this.peerId = peerId; 062 this.wals = wals; 063 this.targetServer = targetServer; 064 } 065 066 @Override 067 public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) { 068 ReplaySyncReplicationWALParameter.Builder builder = 069 ReplaySyncReplicationWALParameter.newBuilder(); 070 builder.setPeerId(peerId); 071 wals.stream().forEach(builder::addWal); 072 return Optional 073 .of(new ServerOperation(this, getProcId(), ReplaySyncReplicationWALCallable.class, 074 builder.build().toByteArray(), env.getMasterServices().getMasterActiveTime())); 075 } 076 077 protected boolean complete(MasterProcedureEnv env, Throwable error) { 078 if (error != null) { 079 LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error); 080 return false; 081 } else { 082 truncateWALs(env); 083 LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId); 084 return true; 085 } 086 } 087 088 /** 089 * Only truncate wals one by one when task succeed. The parent procedure will check the first wal 090 * length to know whether this task succeed. 091 */ 092 private void truncateWALs(MasterProcedureEnv env) { 093 String firstWal = wals.get(0); 094 try { 095 env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(firstWal); 096 } catch (IOException e) { 097 // As it is idempotent to rerun this task. Just ignore this exception and return. 098 LOG.warn("Failed to truncate wal {} for peer id={}", firstWal, peerId, e); 099 return; 100 } 101 for (int i = 1; i < wals.size(); i++) { 102 String wal = wals.get(i); 103 try { 104 env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal); 105 } catch (IOException e1) { 106 try { 107 // retry 108 env.getMasterServices().getSyncReplicationReplayWALManager().finishReplayWAL(wal); 109 } catch (IOException e2) { 110 // As the parent procedure only check the first wal length. Just ignore this exception. 111 LOG.warn("Failed to truncate wal {} for peer id={}", wal, peerId, e2); 112 } 113 } 114 } 115 } 116 117 @Override 118 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 119 throw new UnsupportedOperationException(); 120 } 121 122 @Override 123 protected boolean abort(MasterProcedureEnv env) { 124 return false; 125 } 126 127 @Override 128 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 129 SyncReplicationReplayWALRemoteStateData.Builder builder = 130 SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) 131 .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state); 132 wals.stream().forEach(builder::addWal); 133 if (this.remoteError != null) { 134 ErrorHandlingProtos.ForeignExceptionMessage fem = 135 ForeignExceptionUtil.toProtoForeignException(remoteError); 136 builder.setError(fem); 137 } 138 serializer.serialize(builder.build()); 139 } 140 141 @Override 142 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 143 SyncReplicationReplayWALRemoteStateData data = 144 serializer.deserialize(SyncReplicationReplayWALRemoteStateData.class); 145 peerId = data.getPeerId(); 146 wals = new ArrayList<>(); 147 data.getWalList().forEach(wals::add); 148 targetServer = ProtobufUtil.toServerName(data.getTargetServer()); 149 state = data.getState(); 150 if (data.hasError()) { 151 this.remoteError = ForeignExceptionUtil.toException(data.getError()); 152 } 153 } 154 155 @Override 156 public String getPeerId() { 157 return peerId; 158 } 159 160 @Override 161 public PeerOperationType getPeerOperationType() { 162 return PeerOperationType.SYNC_REPLICATION_REPLAY_WAL_REMOTE; 163 } 164}