001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.Collections; 021import java.util.Map; 022import java.util.Set; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.conf.ConfigurationObserver; 028import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 029import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * This provides an class for maintaining a set of peer clusters. These peers are remote slave 036 * clusters that data is replicated to. 037 * <p> 038 * We implement {@link ConfigurationObserver} mainly for recreating the 039 * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without 040 * restarting the region server. 041 */ 042@InterfaceAudience.Private 043public class ReplicationPeers implements ConfigurationObserver { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class); 046 047 private volatile Configuration conf; 048 049 // Map of peer clusters keyed by their id 050 private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; 051 private final FileSystem fs; 052 private final ZKWatcher zookeeper; 053 private volatile ReplicationPeerStorage peerStorage; 054 055 ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) { 056 this.conf = conf; 057 this.fs = fs; 058 this.zookeeper = zookeeper; 059 this.peerCache = new ConcurrentHashMap<>(); 060 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); 061 } 062 063 public Configuration getConf() { 064 return conf; 065 } 066 067 public void init() throws ReplicationException { 068 // Loading all existing peerIds into peer cache. 069 for (String peerId : this.peerStorage.listPeerIds()) { 070 addPeer(peerId); 071 } 072 } 073 074 public ReplicationPeerStorage getPeerStorage() { 075 return this.peerStorage; 076 } 077 078 /** 079 * Method called after a peer has been connected. It will create a ReplicationPeer to track the 080 * newly connected cluster. 081 * @param peerId a short that identifies the cluster 082 * @return whether a ReplicationPeer was successfully created 083 */ 084 public boolean addPeer(String peerId) throws ReplicationException { 085 if (this.peerCache.containsKey(peerId)) { 086 return false; 087 } 088 089 peerCache.put(peerId, createPeer(peerId)); 090 return true; 091 } 092 093 public void removePeer(String peerId) { 094 peerCache.remove(peerId); 095 } 096 097 /** 098 * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will 099 * continue to track changes to the Peer's state and config. This method returns null if no peer 100 * has been cached with the given peerId. 101 * @param peerId id for the peer 102 * @return ReplicationPeer object 103 */ 104 public ReplicationPeerImpl getPeer(String peerId) { 105 return peerCache.get(peerId); 106 } 107 108 /** 109 * Returns the set of peerIds of the clusters that have been connected and have an underlying 110 * ReplicationPeer. 111 * @return a Set of Strings for peerIds 112 */ 113 public Set<String> getAllPeerIds() { 114 return Collections.unmodifiableSet(peerCache.keySet()); 115 } 116 117 public Map<String, ReplicationPeerImpl> getPeerCache() { 118 return Collections.unmodifiableMap(peerCache); 119 } 120 121 public PeerState refreshPeerState(String peerId) throws ReplicationException { 122 ReplicationPeerImpl peer = peerCache.get(peerId); 123 if (peer == null) { 124 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 125 } 126 peer.setPeerState(peerStorage.isPeerEnabled(peerId)); 127 return peer.getPeerState(); 128 } 129 130 public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { 131 ReplicationPeerImpl peer = peerCache.get(peerId); 132 if (peer == null) { 133 throw new ReplicationException("Peer with id=" + peerId + " is not cached."); 134 } 135 peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); 136 return peer.getPeerConfig(); 137 } 138 139 /** 140 * Helper method to connect to a peer 141 * @param peerId peer's identifier 142 * @return object representing the peer 143 */ 144 private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { 145 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 146 boolean enabled = peerStorage.isPeerEnabled(peerId); 147 return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), 148 peerId, enabled, peerConfig); 149 } 150 151 @Override 152 public void onConfigurationChange(Configuration conf) { 153 this.conf = conf; 154 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf); 155 for (ReplicationPeerImpl peer : peerCache.values()) { 156 try { 157 peer.onConfigurationChange( 158 ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf)); 159 } catch (ReplicationException e) { 160 LOG.warn("failed to reload configuration for peer {}", peer.getId(), e); 161 } 162 } 163 } 164}