001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.regex.Pattern;
030import java.util.stream.Collectors;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
040import org.apache.hadoop.hbase.conf.ConfigurationObserver;
041import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
042import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
043import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
044import org.apache.hadoop.hbase.replication.ReplicationException;
045import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
048import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
049import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.replication.ReplicationUtils;
053import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
054import org.apache.hadoop.hbase.zookeeper.ZKConfig;
055import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.apache.zookeeper.KeeperException;
058
059/**
060 * Manages and performs all replication admin operations.
061 * <p>
062 * Used to add/remove a replication peer.
063 * <p>
064 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
065 * supporting migrating across different replication peer storages without restarting master.
066 */
067@InterfaceAudience.Private
068public class ReplicationPeerManager implements ConfigurationObserver {
069
070  private volatile ReplicationPeerStorage peerStorage;
071
072  private final ReplicationQueueStorage queueStorage;
073
074  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
075
076  private final String clusterId;
077
078  private volatile Configuration conf;
079
080  // for dynamic recreating ReplicationPeerStorage.
081  private final FileSystem fs;
082
083  private final ZKWatcher zk;
084
085  ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
086    ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
087    Configuration conf, String clusterId) {
088    this.fs = fs;
089    this.zk = zk;
090    this.peerStorage = peerStorage;
091    this.queueStorage = queueStorage;
092    this.peers = peers;
093    this.conf = conf;
094    this.clusterId = clusterId;
095  }
096
097  private void checkQueuesDeleted(String peerId)
098    throws ReplicationException, DoNotRetryIOException {
099    for (ServerName replicator : queueStorage.getListOfReplicators()) {
100      List<String> queueIds = queueStorage.getAllQueues(replicator);
101      for (String queueId : queueIds) {
102        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
103        if (queueInfo.getPeerId().equals(peerId)) {
104          throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: "
105            + replicator + ", queueId: " + queueId);
106        }
107      }
108    }
109    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
110      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
111    }
112  }
113
114  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
115    throws DoNotRetryIOException, ReplicationException {
116    if (peerId.contains("-")) {
117      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
118    }
119    checkPeerConfig(peerConfig);
120    if (peers.containsKey(peerId)) {
121      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
122    }
123    // make sure that there is no queues with the same peer id. This may happen when we create a
124    // peer with the same id with a old deleted peer. If the replication queues for the old peer
125    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
126    // file may also be replicated.
127    checkQueuesDeleted(peerId);
128  }
129
130  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
131    ReplicationPeerDescription desc = peers.get(peerId);
132    if (desc == null) {
133      throw new ReplicationPeerNotFoundException(peerId);
134    }
135    return desc;
136  }
137
138  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
139    return checkPeerExists(peerId).getPeerConfig();
140  }
141
142  void preEnablePeer(String peerId) throws DoNotRetryIOException {
143    ReplicationPeerDescription desc = checkPeerExists(peerId);
144    if (desc.isEnabled()) {
145      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
146    }
147  }
148
149  void preDisablePeer(String peerId) throws DoNotRetryIOException {
150    ReplicationPeerDescription desc = checkPeerExists(peerId);
151    if (!desc.isEnabled()) {
152      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
153    }
154  }
155
156  /**
157   * Return the old peer description. Can never be null.
158   */
159  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
160    throws DoNotRetryIOException {
161    checkPeerConfig(peerConfig);
162    ReplicationPeerDescription desc = checkPeerExists(peerId);
163    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
164    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
165      throw new DoNotRetryIOException(
166        "Changing the cluster key on an existing peer is not allowed. Existing key '"
167          + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '"
168          + peerConfig.getClusterKey() + "'");
169    }
170
171    if (
172      !isStringEquals(peerConfig.getReplicationEndpointImpl(),
173        oldPeerConfig.getReplicationEndpointImpl())
174    ) {
175      throw new DoNotRetryIOException("Changing the replication endpoint implementation class "
176        + "on an existing peer is not allowed. Existing class '"
177        + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId
178        + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
179    }
180    return desc;
181  }
182
183  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
184    throws ReplicationException {
185    if (peers.containsKey(peerId)) {
186      // this should be a retry, just return
187      return;
188    }
189    peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
190    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
191    peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
192    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
193  }
194
195  public void removePeer(String peerId) throws ReplicationException {
196    if (!peers.containsKey(peerId)) {
197      // this should be a retry, just return
198      return;
199    }
200    peerStorage.removePeer(peerId);
201    peers.remove(peerId);
202  }
203
204  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
205    ReplicationPeerDescription desc = peers.get(peerId);
206    if (desc.isEnabled() == enabled) {
207      // this should be a retry, just return
208      return;
209    }
210    peerStorage.setPeerState(peerId, enabled);
211    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
212  }
213
214  public boolean getPeerState(String peerId) throws ReplicationException {
215    ReplicationPeerDescription desc = peers.get(peerId);
216    if (desc != null) {
217      return desc.isEnabled();
218    } else {
219      throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
220    }
221  }
222
223  public void enablePeer(String peerId) throws ReplicationException {
224    setPeerState(peerId, true);
225  }
226
227  public void disablePeer(String peerId) throws ReplicationException {
228    setPeerState(peerId, false);
229  }
230
231  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
232    throws ReplicationException {
233    // the checking rules are too complicated here so we give up checking whether this is a retry.
234    ReplicationPeerDescription desc = peers.get(peerId);
235    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
236    ReplicationPeerConfigBuilder newPeerConfigBuilder =
237      ReplicationPeerConfig.newBuilder(peerConfig);
238    // we need to use the new conf to overwrite the old one.
239    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
240    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
241    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
242    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
243    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
244    peerStorage.updatePeerConfig(peerId, newPeerConfig);
245    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
246  }
247
248  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
249    if (pattern == null) {
250      return new ArrayList<>(peers.values());
251    }
252    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
253      .collect(Collectors.toList());
254  }
255
256  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
257    ReplicationPeerDescription desc = peers.get(peerId);
258    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
259  }
260
261  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
262    queueStorage.removeLastSequenceIds(peerId);
263  }
264
265  void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
266    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
267    // on-going when the refresh peer config procedure is done, if a RS which has already been
268    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
269    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
270    // source, then the queue will leave there until the another RS detects the crash and helps
271    // removing the queue.
272    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
273    // claiming, it will either under the old RS or under the new RS, and a queue can only be
274    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
275    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
276    // unless it has already been removed by others.
277    ReplicationUtils.removeAllQueues(queueStorage, peerId);
278    ReplicationUtils.removeAllQueues(queueStorage, peerId);
279    queueStorage.removePeerFromHFileRefs(peerId);
280  }
281
282  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
283    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
284    ReplicationEndpoint endpoint = null;
285    if (!StringUtils.isBlank(replicationEndpointImpl)) {
286      try {
287        // try creating a instance
288        endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class)
289          .getDeclaredConstructor().newInstance();
290      } catch (Throwable e) {
291        throw new DoNotRetryIOException(
292          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
293          e);
294      }
295    }
296    // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
297    if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) {
298      checkClusterKey(peerConfig.getClusterKey());
299      // Check if endpoint can replicate to the same cluster
300      if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
301        checkSameClusterKey(peerConfig.getClusterKey());
302      }
303    }
304
305    if (peerConfig.replicateAllUserTables()) {
306      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
307      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
308      // cluster.
309      if (
310        (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
311          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
312      ) {
313        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
314          + "when you want replicate all cluster");
315      }
316      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
317        peerConfig.getExcludeTableCFsMap());
318    } else {
319      // If replicate_all flag is false, it means all user tables can't be replicated to peer
320      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
321      // cluster.
322      if (
323        (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty())
324          || (peerConfig.getExcludeTableCFsMap() != null
325            && !peerConfig.getExcludeTableCFsMap().isEmpty())
326      ) {
327        throw new DoNotRetryIOException(
328          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
329            + " when replicate_all flag is false");
330      }
331      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
332        peerConfig.getTableCFsMap());
333    }
334
335    checkConfiguredWALEntryFilters(peerConfig);
336  }
337
338  /**
339   * Set a namespace in the peer config means that all tables in this namespace will be replicated
340   * to the peer cluster.
341   * <ol>
342   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
343   * the peer config.</li>
344   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
345   * config.</li>
346   * </ol>
347   * <p>
348   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
349   * replicated to the peer cluster.
350   * <ol>
351   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
352   * this namespace to the peer config.</li>
353   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
354   * exclude namespace.</li>
355   * </ol>
356   */
357  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
358    Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
359    if (namespaces == null || namespaces.isEmpty()) {
360      return;
361    }
362    if (tableCfs == null || tableCfs.isEmpty()) {
363      return;
364    }
365    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
366      TableName table = entry.getKey();
367      if (namespaces.contains(table.getNamespaceAsString())) {
368        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces "
369          + table.getNamespaceAsString() + " in peer config");
370      }
371    }
372  }
373
374  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
375    throws DoNotRetryIOException {
376    String filterCSV = peerConfig.getConfiguration()
377      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
378    if (filterCSV != null && !filterCSV.isEmpty()) {
379      String[] filters = filterCSV.split(",");
380      for (String filter : filters) {
381        try {
382          Class.forName(filter).getDeclaredConstructor().newInstance();
383        } catch (Exception e) {
384          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter
385            + " could not be created. Failing add/update peer operation.", e);
386        }
387      }
388    }
389  }
390
391  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
392    try {
393      ZKConfig.validateClusterKey(clusterKey);
394    } catch (IOException e) {
395      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
396    }
397  }
398
399  private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException {
400    String peerClusterId = "";
401    try {
402      // Create the peer cluster config for get peer cluster id
403      Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
404      try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
405        peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
406      }
407    } catch (IOException | KeeperException e) {
408      throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
409    }
410    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
411    // peerClusterId value, which is the same as the source clusterId
412    if (clusterId.equals(peerClusterId)) {
413      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
414        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
415    }
416  }
417
418  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
419    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
420      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
421      .collect(Collectors.toList());
422  }
423
424  public ReplicationQueueStorage getQueueStorage() {
425    return queueStorage;
426  }
427
428  public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configuration conf,
429    String clusterId) throws ReplicationException {
430    ReplicationPeerStorage peerStorage =
431      ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
432    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
433    for (String peerId : peerStorage.listPeerIds()) {
434      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
435
436      peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
437      peerStorage.updatePeerConfig(peerId, peerConfig);
438      boolean enabled = peerStorage.isPeerEnabled(peerId);
439      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
440    }
441    return new ReplicationPeerManager(fs, zk, peerStorage,
442      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
443  }
444
445  /**
446   * For replication peer cluster key or endpoint class, null and empty string is same. So here
447   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
448   */
449  private boolean isStringEquals(String s1, String s2) {
450    if (StringUtils.isBlank(s1)) {
451      return StringUtils.isBlank(s2);
452    }
453    return s1.equals(s2);
454  }
455
456  @Override
457  public void onConfigurationChange(Configuration conf) {
458    this.conf = conf;
459    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
460  }
461}