001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.net.URI;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.TimeUnit;
036import java.util.regex.Pattern;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
052import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
053import org.apache.hadoop.hbase.conf.ConfigurationObserver;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
056import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
058import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
060import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
061import org.apache.hadoop.hbase.replication.ReplicationException;
062import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
063import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
064import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
065import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
066import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
067import org.apache.hadoop.hbase.replication.ReplicationQueueData;
068import org.apache.hadoop.hbase.replication.ReplicationQueueId;
069import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
070import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
071import org.apache.hadoop.hbase.replication.ReplicationUtils;
072import org.apache.hadoop.hbase.replication.SyncReplicationState;
073import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
074import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
075import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
076import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
077import org.apache.hadoop.hbase.util.FutureUtils;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
080import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
081import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
082import org.apache.hadoop.hbase.zookeeper.ZKConfig;
083import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
084import org.apache.yetus.audience.InterfaceAudience;
085import org.apache.zookeeper.KeeperException;
086import org.slf4j.Logger;
087import org.slf4j.LoggerFactory;
088
089import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
090import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
091
092/**
093 * Manages and performs all replication admin operations.
094 * <p>
095 * Used to add/remove a replication peer.
096 * <p>
097 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
098 * supporting migrating across different replication peer storages without restarting master.
099 */
100@InterfaceAudience.Private
101public class ReplicationPeerManager implements ConfigurationObserver {
102
103  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
104
105  private volatile ReplicationPeerStorage peerStorage;
106
107  private final ReplicationQueueStorage queueStorage;
108
109  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
110
111  private final ImmutableMap<SyncReplicationState,
112    EnumSet<SyncReplicationState>> allowedTransition =
113      Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
114        EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
115        SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
116        SyncReplicationState.DOWNGRADE_ACTIVE,
117        EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
118
119  private final String clusterId;
120
121  private volatile Configuration conf;
122
123  // for dynamic recreating ReplicationPeerStorage.
124  private final FileSystem fs;
125
126  private final ZKWatcher zk;
127
128  @FunctionalInterface
129  interface ReplicationQueueStorageInitializer {
130
131    void initialize() throws IOException;
132  }
133
134  private final ReplicationQueueStorageInitializer queueStorageInitializer;
135
136  // we will mock this class in UT so leave the constructor as package private and not mark the
137  // class as final, since mockito can not mock a final class
138  ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
139    ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
140    Configuration conf, String clusterId,
141    ReplicationQueueStorageInitializer queueStorageInitializer) {
142    this.fs = fs;
143    this.zk = zk;
144    this.peerStorage = peerStorage;
145    this.queueStorage = queueStorage;
146    this.peers = peers;
147    this.conf = conf;
148    this.clusterId = clusterId;
149    this.queueStorageInitializer = queueStorageInitializer;
150  }
151
152  private void checkQueuesDeleted(String peerId)
153    throws ReplicationException, DoNotRetryIOException {
154    List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(peerId);
155    if (!queueIds.isEmpty()) {
156      throw new DoNotRetryIOException("There are still " + queueIds.size()
157        + " undeleted queue(s) for peerId: " + peerId + ", first is " + queueIds.get(0));
158    }
159    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
160      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
161    }
162  }
163
164  private void initializeQueueStorage() throws IOException {
165    queueStorageInitializer.initialize();
166  }
167
168  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
169    throws ReplicationException, IOException {
170    if (peerId.contains("-")) {
171      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
172    }
173    checkPeerConfig(peerConfig);
174    if (peerConfig.isSyncReplication()) {
175      checkSyncReplicationPeerConfigConflict(peerConfig);
176    }
177    if (peers.containsKey(peerId)) {
178      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
179    }
180
181    // lazy create table
182    initializeQueueStorage();
183    // make sure that there is no queues with the same peer id. This may happen when we create a
184    // peer with the same id with a old deleted peer. If the replication queues for the old peer
185    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
186    // file may also be replicated.
187    checkQueuesDeleted(peerId);
188  }
189
190  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
191    ReplicationPeerDescription desc = peers.get(peerId);
192    if (desc == null) {
193      throw new ReplicationPeerNotFoundException(peerId);
194    }
195    return desc;
196  }
197
198  private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException {
199    ReplicationPeerDescription desc = peers.get(peerId);
200    if (
201      desc != null && desc.getPeerConfig().isSyncReplication()
202        && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())
203    ) {
204      throw new DoNotRetryIOException(
205        "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState()
206          + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly.");
207    }
208  }
209
210  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
211    ReplicationPeerDescription pd = checkPeerExists(peerId);
212    checkPeerInDAStateIfSyncReplication(peerId);
213    return pd.getPeerConfig();
214  }
215
216  void preEnablePeer(String peerId) throws DoNotRetryIOException {
217    ReplicationPeerDescription desc = checkPeerExists(peerId);
218    if (desc.isEnabled()) {
219      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
220    }
221  }
222
223  void preDisablePeer(String peerId) throws DoNotRetryIOException {
224    ReplicationPeerDescription desc = checkPeerExists(peerId);
225    if (!desc.isEnabled()) {
226      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
227    }
228  }
229
230  /**
231   * Return the old peer description. Can never be null.
232   */
233  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
234    throws DoNotRetryIOException {
235    checkPeerConfig(peerConfig);
236    ReplicationPeerDescription desc = checkPeerExists(peerId);
237    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
238    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
239      throw new DoNotRetryIOException(
240        "Changing the cluster key on an existing peer is not allowed. Existing key '"
241          + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '"
242          + peerConfig.getClusterKey() + "'");
243    }
244
245    if (
246      !isStringEquals(peerConfig.getReplicationEndpointImpl(),
247        oldPeerConfig.getReplicationEndpointImpl())
248    ) {
249      throw new DoNotRetryIOException("Changing the replication endpoint implementation class "
250        + "on an existing peer is not allowed. Existing class '"
251        + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId
252        + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
253    }
254
255    if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
256      throw new DoNotRetryIOException(
257        "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal "
258          + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId
259          + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
260    }
261
262    if (oldPeerConfig.isSyncReplication()) {
263      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
264        throw new DoNotRetryIOException(
265          "Changing the replicated namespace/table config on a synchronous replication "
266            + "peer(peerId: " + peerId + ") is not allowed.");
267      }
268    }
269    return desc;
270  }
271
272  /** Returns the old desciption of the peer */
273  ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
274    SyncReplicationState state) throws DoNotRetryIOException {
275    ReplicationPeerDescription desc = checkPeerExists(peerId);
276    SyncReplicationState fromState = desc.getSyncReplicationState();
277    EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
278    if (allowedToStates == null || !allowedToStates.contains(state)) {
279      throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState
280        + " to " + state + " for peer id=" + peerId);
281    }
282    return desc;
283  }
284
285  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
286    throws ReplicationException {
287    if (peers.containsKey(peerId)) {
288      // this should be a retry, just return
289      return;
290    }
291    peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
292    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
293    SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication()
294      ? SyncReplicationState.DOWNGRADE_ACTIVE
295      : SyncReplicationState.NONE;
296    peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
297    peers.put(peerId,
298      new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
299  }
300
301  public void removePeer(String peerId) throws ReplicationException {
302    if (!peers.containsKey(peerId)) {
303      // this should be a retry, just return
304      return;
305    }
306    peerStorage.removePeer(peerId);
307    peers.remove(peerId);
308  }
309
310  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
311    ReplicationPeerDescription desc = peers.get(peerId);
312    if (desc.isEnabled() == enabled) {
313      // this should be a retry, just return
314      return;
315    }
316    peerStorage.setPeerState(peerId, enabled);
317    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
318      desc.getSyncReplicationState()));
319  }
320
321  public boolean getPeerState(String peerId) throws ReplicationException {
322    ReplicationPeerDescription desc = peers.get(peerId);
323    if (desc != null) {
324      return desc.isEnabled();
325    } else {
326      throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
327    }
328  }
329
330  public void enablePeer(String peerId) throws ReplicationException {
331    setPeerState(peerId, true);
332  }
333
334  public void disablePeer(String peerId) throws ReplicationException {
335    setPeerState(peerId, false);
336  }
337
338  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
339    throws ReplicationException {
340    // the checking rules are too complicated here so we give up checking whether this is a retry.
341    ReplicationPeerDescription desc = peers.get(peerId);
342    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
343    ReplicationPeerConfigBuilder newPeerConfigBuilder =
344      ReplicationPeerConfig.newBuilder(peerConfig);
345    // we need to use the new conf to overwrite the old one.
346    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
347    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
348    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
349    peerStorage.updatePeerConfig(peerId, newPeerConfig);
350    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
351      desc.getSyncReplicationState()));
352  }
353
354  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
355    if (pattern == null) {
356      return new ArrayList<>(peers.values());
357    }
358    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
359      .collect(Collectors.toList());
360  }
361
362  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
363    ReplicationPeerDescription desc = peers.get(peerId);
364    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
365  }
366
367  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
368    queueStorage.removeLastSequenceIds(peerId);
369  }
370
371  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
372    throws ReplicationException {
373    peerStorage.setPeerNewSyncReplicationState(peerId, state);
374  }
375
376  public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
377    throws ReplicationException {
378    if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
379      // Only transit if this is not a retry
380      peerStorage.transitPeerSyncReplicationState(peerId);
381    }
382    ReplicationPeerDescription desc = peers.get(peerId);
383    if (desc.getSyncReplicationState() != newState) {
384      // Only recreate the desc if this is not a retry
385      peers.put(peerId,
386        new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
387    }
388  }
389
390  public void removeAllQueues(String peerId) throws ReplicationException {
391    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
392    // on-going when the refresh peer config procedure is done, if a RS which has already been
393    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
394    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
395    // source, then the queue will leave there until the another RS detects the crash and helps
396    // removing the queue.
397    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
398    // claiming, it will either under the old RS or under the new RS, and a queue can only be
399    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
400    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
401    // unless it has already been removed by others.
402    queueStorage.removeAllQueues(peerId);
403    queueStorage.removeAllQueues(peerId);
404  }
405
406  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
407    removeAllQueues(peerId);
408    queueStorage.removePeerFromHFileRefs(peerId);
409  }
410
411  private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint)
412    throws DoNotRetryIOException {
413    if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) {
414      return;
415    }
416    // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
417    URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey);
418    try {
419      if (connectionUri != null) {
420        ConnectionRegistryFactory.validate(connectionUri);
421      } else {
422        ZKConfig.validateClusterKey(clusterKey);
423      }
424    } catch (IOException e) {
425      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
426    }
427    if (endpoint != null && endpoint.canReplicateToSameCluster()) {
428      return;
429    }
430    // make sure we do not replicate to same cluster
431    String peerClusterId;
432    try {
433      if (connectionUri != null) {
434        // fetch cluster id through standard admin API
435        try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf);
436          Admin admin = conn.getAdmin()) {
437          peerClusterId =
438            admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId();
439        }
440      } else {
441        // Create the peer cluster config for get peer cluster id
442        Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
443        try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
444          peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
445        }
446      }
447    } catch (IOException | KeeperException e) {
448      // we just want to check whether we will replicate to the same cluster, so if we get an error
449      // while getting the cluster id of the peer cluster, it means we are not connecting to
450      // ourselves, as we are still alive. So here we just log the error and continue
451      LOG.warn("Can't get peerClusterId for clusterKey=" + clusterKey, e);
452      return;
453    }
454    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
455    // peerClusterId value, which is the same as the source clusterId
456    if (clusterId.equals(peerClusterId)) {
457      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
458        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
459    }
460  }
461
462  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
463    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
464    ReplicationEndpoint endpoint = null;
465    if (!StringUtils.isBlank(replicationEndpointImpl)) {
466      try {
467        // try creating a instance
468        endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class)
469          .getDeclaredConstructor().newInstance();
470      } catch (Throwable e) {
471        throw new DoNotRetryIOException(
472          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
473          e);
474      }
475    }
476    checkClusterKey(peerConfig.getClusterKey(), endpoint);
477
478    if (peerConfig.replicateAllUserTables()) {
479      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
480      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
481      // cluster.
482      if (
483        (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
484          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
485      ) {
486        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
487          + "when you want replicate all cluster");
488      }
489      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
490        peerConfig.getExcludeTableCFsMap());
491    } else {
492      // If replicate_all flag is false, it means all user tables can't be replicated to peer
493      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
494      // cluster.
495      if (
496        (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty())
497          || (peerConfig.getExcludeTableCFsMap() != null
498            && !peerConfig.getExcludeTableCFsMap().isEmpty())
499      ) {
500        throw new DoNotRetryIOException(
501          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
502            + " when replicate_all flag is false");
503      }
504      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
505        peerConfig.getTableCFsMap());
506    }
507
508    if (peerConfig.isSyncReplication()) {
509      checkPeerConfigForSyncReplication(peerConfig);
510    }
511
512    checkConfiguredWALEntryFilters(peerConfig);
513  }
514
515  private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
516    throws DoNotRetryIOException {
517    // This is used to reduce the difficulty for implementing the sync replication state transition
518    // as we need to reopen all the related regions.
519    // TODO: Add namespace, replicat_all flag back
520    if (peerConfig.replicateAllUserTables()) {
521      throw new DoNotRetryIOException(
522        "Only support replicated table config for sync replication peer");
523    }
524    if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
525      throw new DoNotRetryIOException(
526        "Only support replicated table config for sync replication peer");
527    }
528    if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
529      throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
530    }
531    for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
532      if (cfs != null && !cfs.isEmpty()) {
533        throw new DoNotRetryIOException(
534          "Only support replicated table config for sync replication peer");
535      }
536    }
537
538    Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
539    if (!remoteWALDir.isAbsolute()) {
540      throw new DoNotRetryIOException(
541        "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
542    }
543    URI remoteWALDirUri = remoteWALDir.toUri();
544    if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
545      throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir()
546        + " is not qualified, you must provide scheme and authority");
547    }
548  }
549
550  private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
551    throws DoNotRetryIOException {
552    for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
553      for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
554        ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
555        if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
556          throw new DoNotRetryIOException(
557            "Table " + tableName + " has been replicated by peer " + entry.getKey());
558        }
559      }
560    }
561  }
562
563  /**
564   * Set a namespace in the peer config means that all tables in this namespace will be replicated
565   * to the peer cluster.
566   * <ol>
567   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
568   * the peer config.</li>
569   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
570   * config.</li>
571   * </ol>
572   * <p>
573   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
574   * replicated to the peer cluster.
575   * <ol>
576   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
577   * this namespace to the peer config.</li>
578   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
579   * exclude namespace.</li>
580   * </ol>
581   */
582  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
583    Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
584    if (namespaces == null || namespaces.isEmpty()) {
585      return;
586    }
587    if (tableCfs == null || tableCfs.isEmpty()) {
588      return;
589    }
590    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
591      TableName table = entry.getKey();
592      if (namespaces.contains(table.getNamespaceAsString())) {
593        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces "
594          + table.getNamespaceAsString() + " in peer config");
595      }
596    }
597  }
598
599  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
600    throws DoNotRetryIOException {
601    String filterCSV = peerConfig.getConfiguration()
602      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
603    if (filterCSV != null && !filterCSV.isEmpty()) {
604      String[] filters = filterCSV.split(",");
605      for (String filter : filters) {
606        try {
607          Class.forName(filter).getDeclaredConstructor().newInstance();
608        } catch (Exception e) {
609          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter
610            + " could not be created. Failing add/update peer operation.", e);
611        }
612      }
613    }
614  }
615
616  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
617    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
618      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
619      .collect(Collectors.toList());
620  }
621
622  @RestrictedApi(explanation = "Should only be called in tests", link = "",
623      allowedOnPath = ".*/src/test/.*")
624  public ReplicationPeerStorage getPeerStorage() {
625    return peerStorage;
626  }
627
628  public ReplicationQueueStorage getQueueStorage() {
629    return queueStorage;
630  }
631
632  private static Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer>
633    createReplicationQueueStorage(MasterServices services) throws IOException {
634    Configuration conf = services.getConfiguration();
635    TableName replicationQueueTableName =
636      TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
637        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
638    ReplicationQueueStorageInitializer initializer;
639    if (services.getTableDescriptors().exists(replicationQueueTableName)) {
640      // no need to create the table
641      initializer = () -> {
642      };
643    } else {
644      // lazy create the replication table.
645      initializer = new ReplicationQueueStorageInitializer() {
646
647        private volatile boolean created = false;
648
649        @Override
650        public void initialize() throws IOException {
651          if (created) {
652            return;
653          }
654          synchronized (this) {
655            if (created) {
656              return;
657            }
658            if (services.getTableDescriptors().exists(replicationQueueTableName)) {
659              created = true;
660              return;
661            }
662            long procId = services.createSystemTable(ReplicationStorageFactory
663              .createReplicationQueueTableDescriptor(replicationQueueTableName));
664            ProcedureExecutor<MasterProcedureEnv> procExec = services.getMasterProcedureExecutor();
665            ProcedureSyncWait.waitFor(procExec.getEnvironment(), TimeUnit.MINUTES.toMillis(1),
666              "Creating table " + replicationQueueTableName, () -> procExec.isFinished(procId));
667          }
668        }
669      };
670    }
671    return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage(
672      services.getConnection(), conf, replicationQueueTableName), initializer);
673  }
674
675  public static ReplicationPeerManager create(MasterServices services, String clusterId)
676    throws ReplicationException, IOException {
677    Configuration conf = services.getConfiguration();
678    FileSystem fs = services.getMasterFileSystem().getFileSystem();
679    ZKWatcher zk = services.getZooKeeper();
680    ReplicationPeerStorage peerStorage =
681      ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
682    Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> pair =
683      createReplicationQueueStorage(services);
684    ReplicationQueueStorage queueStorage = pair.getFirst();
685    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
686    for (String peerId : peerStorage.listPeerIds()) {
687      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
688      if (
689        ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
690          .equals(peerConfig.getReplicationEndpointImpl())
691      ) {
692        // If memstore region replication is enabled, there will be a special replication peer
693        // usually called 'region_replica_replication'. We do not need to load it or migrate its
694        // replication queue data since we do not rely on general replication framework for
695        // region replication in 3.x now, please see HBASE-26233 for more details.
696        // We can not delete it now since region server with old version still want to update
697        // the replicated wal position to zk, if we delete the replication queue zk node, rs
698        // will crash. See HBASE-29169 for more details.
699        // In MigrateReplicationQueueFromZkToTableProcedure, finally we will call a deleteAllData on
700        // the old replication queue storage, to make sure that we will delete the the queue data
701        // for this peer and also the peer info in replication peer storage
702        LOG.info("Found old region replica replication peer '{}', skip loading it", peerId);
703        continue;
704      }
705      peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
706      peerStorage.updatePeerConfig(peerId, peerConfig);
707      boolean enabled = peerStorage.isPeerEnabled(peerId);
708      SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
709      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
710    }
711    return new ReplicationPeerManager(fs, zk, peerStorage, queueStorage, peers, conf, clusterId,
712      pair.getSecond());
713  }
714
715  /**
716   * For replication peer cluster key or endpoint class, null and empty string is same. So here
717   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
718   */
719  private boolean isStringEquals(String s1, String s2) {
720    if (StringUtils.isBlank(s1)) {
721      return StringUtils.isBlank(s2);
722    }
723    return s1.equals(s2);
724  }
725
726  @Override
727  public void onConfigurationChange(Configuration conf) {
728    this.conf = conf;
729    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
730  }
731
732  private ReplicationQueueData convert(ZkReplicationQueueData zkData) {
733    Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>();
734    zkData.getWalOffsets().forEach((wal, offset) -> {
735      String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
736      groupOffsets.compute(walGroup, (k, oldOffset) -> {
737        if (oldOffset == null) {
738          return new ReplicationGroupOffset(wal, offset);
739        }
740        // we should record the first wal's offset
741        long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal());
742        long walTs = AbstractFSWALProvider.getTimestamp(wal);
743        if (walTs < oldWalTs) {
744          return new ReplicationGroupOffset(wal, offset);
745        }
746        return oldOffset;
747      });
748    });
749    return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets));
750  }
751
752  private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage)
753    throws Exception {
754    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
755      oldQueueStorage.listAllQueues();
756    for (;;) {
757      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
758      if (pair == null) {
759        return;
760      }
761      queueStorage.batchUpdateQueues(pair.getFirst(),
762        pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId()))
763          .map(this::convert).collect(Collectors.toList()));
764    }
765  }
766
767  private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage)
768    throws Exception {
769    MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds();
770    for (;;) {
771      List<ZkLastPushedSeqId> list = iter.next();
772      if (list == null) {
773        return;
774      }
775      queueStorage.batchUpdateLastSequenceIds(list.stream()
776        .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList()));
777    }
778  }
779
780  private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage)
781    throws Exception {
782    MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs();
783    for (;;) {
784      Pair<String, List<String>> pair = iter.next();
785      if (pair == null) {
786        return;
787      }
788      if (peers.containsKey(pair.getFirst())) {
789        queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond());
790      }
791    }
792  }
793
794  private interface ExceptionalRunnable {
795    void run() throws Exception;
796  }
797
798  private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
799    CompletableFuture<?> future = new CompletableFuture<>();
800    executor.execute(() -> {
801      try {
802        task.run();
803        future.complete(null);
804      } catch (Exception e) {
805        future.completeExceptionally(e);
806      }
807    });
808    return future;
809  }
810
811  // this is for upgrading from 2.x to 3.x, in 3.x we will not load the 'region_replica_replication'
812  // peer, but we still need to know whether we have it on the old storage
813  boolean hasRegionReplicaReplicationPeer() throws ReplicationException {
814    return peerStorage.listPeerIds().stream()
815      .anyMatch(p -> p.equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER));
816  }
817
818  /**
819   * Submit the migration tasks to the given {@code executor}.
820   */
821  CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
822    // the replication queue table creation is asynchronous and will be triggered by addPeer, so
823    // here we need to manually initialize it since we will not call addPeer.
824    try {
825      initializeQueueStorage();
826    } catch (IOException e) {
827      return FutureUtils.failedFuture(e);
828    }
829    ZKReplicationQueueStorageForMigration oldStorage =
830      new ZKReplicationQueueStorageForMigration(zookeeper, conf);
831    return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
832      runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
833      runAsync(() -> migrateHFileRefs(oldStorage), executor));
834  }
835
836  void deleteLegacyRegionReplicaReplicationPeer() throws ReplicationException {
837    for (String peerId : peerStorage.listPeerIds()) {
838      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
839      if (
840        ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
841          .equals(peerConfig.getReplicationEndpointImpl())
842      ) {
843        LOG.info("Delete old region replica replication peer '{}'", peerId);
844        peerStorage.removePeer(peerId);
845      }
846    }
847  }
848}