001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.Collections;
021import java.util.Map;
022import java.util.Set;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.hbase.conf.ConfigurationObserver;
028import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
029import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * This provides an class for maintaining a set of peer clusters. These peers are remote slave
036 * clusters that data is replicated to.
037 * <p>
038 * We implement {@link ConfigurationObserver} mainly for recreating the
039 * {@link ReplicationPeerStorage}, so we can change the {@link ReplicationPeerStorage} without
040 * restarting the region server.
041 */
042@InterfaceAudience.Private
043public class ReplicationPeers implements ConfigurationObserver {
044
045  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
046
047  private volatile Configuration conf;
048
049  // Map of peer clusters keyed by their id
050  private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
051  private final FileSystem fs;
052  private final ZKWatcher zookeeper;
053  private volatile ReplicationPeerStorage peerStorage;
054
055  ReplicationPeers(FileSystem fs, ZKWatcher zookeeper, Configuration conf) {
056    this.conf = conf;
057    this.fs = fs;
058    this.zookeeper = zookeeper;
059    this.peerCache = new ConcurrentHashMap<>();
060    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
061  }
062
063  public Configuration getConf() {
064    return conf;
065  }
066
067  public void init() throws ReplicationException {
068    // Loading all existing peerIds into peer cache.
069    for (String peerId : this.peerStorage.listPeerIds()) {
070      addPeer(peerId);
071    }
072  }
073
074  public ReplicationPeerStorage getPeerStorage() {
075    return this.peerStorage;
076  }
077
078  /**
079   * Method called after a peer has been connected. It will create a ReplicationPeer to track the
080   * newly connected cluster.
081   * @param peerId a short that identifies the cluster
082   * @return whether a ReplicationPeer was successfully created
083   */
084  public boolean addPeer(String peerId) throws ReplicationException {
085    if (this.peerCache.containsKey(peerId)) {
086      return false;
087    }
088
089    peerCache.put(peerId, createPeer(peerId));
090    return true;
091  }
092
093  public void removePeer(String peerId) {
094    peerCache.remove(peerId);
095  }
096
097  /**
098   * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
099   * continue to track changes to the Peer's state and config. This method returns null if no peer
100   * has been cached with the given peerId.
101   * @param peerId id for the peer
102   * @return ReplicationPeer object
103   */
104  public ReplicationPeerImpl getPeer(String peerId) {
105    return peerCache.get(peerId);
106  }
107
108  /**
109   * Returns the set of peerIds of the clusters that have been connected and have an underlying
110   * ReplicationPeer.
111   * @return a Set of Strings for peerIds
112   */
113  public Set<String> getAllPeerIds() {
114    return Collections.unmodifiableSet(peerCache.keySet());
115  }
116
117  public Map<String, ReplicationPeerImpl> getPeerCache() {
118    return Collections.unmodifiableMap(peerCache);
119  }
120
121  public PeerState refreshPeerState(String peerId) throws ReplicationException {
122    ReplicationPeerImpl peer = peerCache.get(peerId);
123    if (peer == null) {
124      throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
125    }
126    peer.setPeerState(peerStorage.isPeerEnabled(peerId));
127    return peer.getPeerState();
128  }
129
130  public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
131    ReplicationPeerImpl peer = peerCache.get(peerId);
132    if (peer == null) {
133      throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
134    }
135    peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
136    return peer.getPeerConfig();
137  }
138
139  /**
140   * Helper method to connect to a peer
141   * @param peerId peer's identifier
142   * @return object representing the peer
143   */
144  private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
145    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
146    boolean enabled = peerStorage.isPeerEnabled(peerId);
147    return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
148      peerId, enabled, peerConfig);
149  }
150
151  @Override
152  public void onConfigurationChange(Configuration conf) {
153    this.conf = conf;
154    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zookeeper, conf);
155    for (ReplicationPeerImpl peer : peerCache.values()) {
156      try {
157        peer.onConfigurationChange(
158          ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf));
159      } catch (ReplicationException e) {
160        LOG.warn("failed to reload configuration for peer {}", peer.getId(), e);
161      }
162    }
163  }
164}