001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.locks.Lock; 022import org.apache.hadoop.hbase.ServerName; 023import org.apache.hadoop.hbase.replication.ReplicationException; 024import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 025import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 026import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 027import org.apache.hadoop.hbase.replication.ReplicationPeers; 028import org.apache.hadoop.hbase.replication.ReplicationUtils; 029import org.apache.hadoop.hbase.util.KeyLocker; 030import org.apache.yetus.audience.InterfaceAudience; 031 032@InterfaceAudience.Private 033public class PeerProcedureHandlerImpl implements PeerProcedureHandler { 034 035 private final ReplicationSourceManager replicationSourceManager; 036 private final KeyLocker<String> peersLock = new KeyLocker<>(); 037 038 public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { 039 this.replicationSourceManager = replicationSourceManager; 040 } 041 042 @Override 043 public void addPeer(String peerId) throws IOException { 044 Lock peerLock = peersLock.acquireLock(peerId); 045 try { 046 replicationSourceManager.addPeer(peerId); 047 } finally { 048 peerLock.unlock(); 049 } 050 } 051 052 @Override 053 public void removePeer(String peerId) throws IOException { 054 Lock peerLock = peersLock.acquireLock(peerId); 055 try { 056 if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { 057 replicationSourceManager.removePeer(peerId); 058 } 059 } finally { 060 peerLock.unlock(); 061 } 062 } 063 064 private void refreshPeerState(String peerId) throws ReplicationException, IOException { 065 PeerState newState; 066 Lock peerLock = peersLock.acquireLock(peerId); 067 ReplicationPeerImpl peer = null; 068 PeerState oldState = null; 069 boolean success = false; 070 try { 071 peer = replicationSourceManager.getReplicationPeers().getPeer(peerId); 072 if (peer == null) { 073 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 074 } 075 oldState = peer.getPeerState(); 076 newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); 077 // RS need to start work with the new replication state change 078 if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) { 079 replicationSourceManager.refreshSources(peerId); 080 } 081 success = true; 082 } finally { 083 if (!success && peer != null) { 084 // Reset peer state if refresh source failed 085 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 086 } 087 peerLock.unlock(); 088 } 089 } 090 091 @Override 092 public void enablePeer(String peerId) throws ReplicationException, IOException { 093 refreshPeerState(peerId); 094 } 095 096 @Override 097 public void disablePeer(String peerId) throws ReplicationException, IOException { 098 refreshPeerState(peerId); 099 } 100 101 @Override 102 public void updatePeerConfig(String peerId) throws ReplicationException, IOException { 103 Lock peerLock = peersLock.acquireLock(peerId); 104 ReplicationPeers peers = replicationSourceManager.getReplicationPeers(); 105 ReplicationPeerImpl peer = null; 106 ReplicationPeerConfig oldConfig = null; 107 PeerState oldState = null; 108 boolean success = false; 109 try { 110 peer = peers.getPeer(peerId); 111 if (peer == null) { 112 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 113 } 114 oldConfig = peer.getPeerConfig(); 115 oldState = peer.getPeerState(); 116 ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId); 117 // also need to refresh peer state here. When updating a serial replication peer we may 118 // disable it first and then enable it. 119 PeerState newState = peers.refreshPeerState(peerId); 120 // RS need to start work with the new replication config change 121 if ( 122 !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) 123 || oldConfig.isSerial() != newConfig.isSerial() 124 || (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) 125 ) { 126 replicationSourceManager.refreshSources(peerId); 127 } 128 success = true; 129 } finally { 130 if (!success && peer != null) { 131 // Reset peer config if refresh source failed 132 peer.setPeerConfig(oldConfig); 133 peer.setPeerState(oldState.equals(PeerState.ENABLED)); 134 } 135 peerLock.unlock(); 136 } 137 } 138 139 @Override 140 public void claimReplicationQueue(ServerName crashedServer, String queue) 141 throws ReplicationException, IOException { 142 replicationSourceManager.claimQueue(crashedServer, queue); 143 } 144}