001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.net.URI; 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.TimeUnit; 036import java.util.regex.Pattern; 037import java.util.stream.Collectors; 038import org.apache.commons.lang3.StringUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.ClusterMetrics; 043import org.apache.hadoop.hbase.DoNotRetryIOException; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.Connection; 050import org.apache.hadoop.hbase.client.ConnectionFactory; 051import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; 052import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 053import org.apache.hadoop.hbase.conf.ConfigurationObserver; 054import org.apache.hadoop.hbase.master.MasterServices; 055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 056import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 058import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 060import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 061import org.apache.hadoop.hbase.replication.ReplicationException; 062import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 063import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 064import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 065import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 066import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 067import org.apache.hadoop.hbase.replication.ReplicationQueueData; 068import org.apache.hadoop.hbase.replication.ReplicationQueueId; 069import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 070import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 071import org.apache.hadoop.hbase.replication.ReplicationUtils; 072import org.apache.hadoop.hbase.replication.SyncReplicationState; 073import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; 074import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; 075import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; 076import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; 077import org.apache.hadoop.hbase.util.FutureUtils; 078import org.apache.hadoop.hbase.util.Pair; 079import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 080import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 081import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 082import org.apache.hadoop.hbase.zookeeper.ZKConfig; 083import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 084import org.apache.yetus.audience.InterfaceAudience; 085import org.apache.zookeeper.KeeperException; 086import org.slf4j.Logger; 087import org.slf4j.LoggerFactory; 088 089import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 090import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 091 092/** 093 * Manages and performs all replication admin operations. 094 * <p> 095 * Used to add/remove a replication peer. 096 * <p> 097 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for 098 * supporting migrating across different replication peer storages without restarting master. 099 */ 100@InterfaceAudience.Private 101public class ReplicationPeerManager implements ConfigurationObserver { 102 103 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class); 104 105 private volatile ReplicationPeerStorage peerStorage; 106 107 private final ReplicationQueueStorage queueStorage; 108 109 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 110 111 private final ImmutableMap<SyncReplicationState, 112 EnumSet<SyncReplicationState>> allowedTransition = 113 Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, 114 EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY), 115 SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), 116 SyncReplicationState.DOWNGRADE_ACTIVE, 117 EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); 118 119 private final String clusterId; 120 121 private volatile Configuration conf; 122 123 // for dynamic recreating ReplicationPeerStorage. 124 private final FileSystem fs; 125 126 private final ZKWatcher zk; 127 128 @FunctionalInterface 129 interface ReplicationQueueStorageInitializer { 130 131 void initialize() throws IOException; 132 } 133 134 private final ReplicationQueueStorageInitializer queueStorageInitializer; 135 136 // we will mock this class in UT so leave the constructor as package private and not mark the 137 // class as final, since mockito can not mock a final class 138 ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage, 139 ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers, 140 Configuration conf, String clusterId, 141 ReplicationQueueStorageInitializer queueStorageInitializer) { 142 this.fs = fs; 143 this.zk = zk; 144 this.peerStorage = peerStorage; 145 this.queueStorage = queueStorage; 146 this.peers = peers; 147 this.conf = conf; 148 this.clusterId = clusterId; 149 this.queueStorageInitializer = queueStorageInitializer; 150 } 151 152 private void checkQueuesDeleted(String peerId) 153 throws ReplicationException, DoNotRetryIOException { 154 List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(peerId); 155 if (!queueIds.isEmpty()) { 156 throw new DoNotRetryIOException("There are still " + queueIds.size() 157 + " undeleted queue(s) for peerId: " + peerId + ", first is " + queueIds.get(0)); 158 } 159 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 160 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 161 } 162 } 163 164 private void initializeQueueStorage() throws IOException { 165 queueStorageInitializer.initialize(); 166 } 167 168 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 169 throws ReplicationException, IOException { 170 if (peerId.contains("-")) { 171 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 172 } 173 checkPeerConfig(peerConfig); 174 if (peerConfig.isSyncReplication()) { 175 checkSyncReplicationPeerConfigConflict(peerConfig); 176 } 177 if (peers.containsKey(peerId)) { 178 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 179 } 180 181 // lazy create table 182 initializeQueueStorage(); 183 // make sure that there is no queues with the same peer id. This may happen when we create a 184 // peer with the same id with a old deleted peer. If the replication queues for the old peer 185 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 186 // file may also be replicated. 187 checkQueuesDeleted(peerId); 188 } 189 190 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 191 ReplicationPeerDescription desc = peers.get(peerId); 192 if (desc == null) { 193 throw new ReplicationPeerNotFoundException(peerId); 194 } 195 return desc; 196 } 197 198 private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException { 199 ReplicationPeerDescription desc = peers.get(peerId); 200 if ( 201 desc != null && desc.getPeerConfig().isSyncReplication() 202 && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState()) 203 ) { 204 throw new DoNotRetryIOException( 205 "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState() 206 + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly."); 207 } 208 } 209 210 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 211 ReplicationPeerDescription pd = checkPeerExists(peerId); 212 checkPeerInDAStateIfSyncReplication(peerId); 213 return pd.getPeerConfig(); 214 } 215 216 void preEnablePeer(String peerId) throws DoNotRetryIOException { 217 ReplicationPeerDescription desc = checkPeerExists(peerId); 218 if (desc.isEnabled()) { 219 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 220 } 221 } 222 223 void preDisablePeer(String peerId) throws DoNotRetryIOException { 224 ReplicationPeerDescription desc = checkPeerExists(peerId); 225 if (!desc.isEnabled()) { 226 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 227 } 228 } 229 230 /** 231 * Return the old peer description. Can never be null. 232 */ 233 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 234 throws DoNotRetryIOException { 235 checkPeerConfig(peerConfig); 236 ReplicationPeerDescription desc = checkPeerExists(peerId); 237 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 238 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 239 throw new DoNotRetryIOException( 240 "Changing the cluster key on an existing peer is not allowed. Existing key '" 241 + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" 242 + peerConfig.getClusterKey() + "'"); 243 } 244 245 if ( 246 !isStringEquals(peerConfig.getReplicationEndpointImpl(), 247 oldPeerConfig.getReplicationEndpointImpl()) 248 ) { 249 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " 250 + "on an existing peer is not allowed. Existing class '" 251 + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId 252 + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 253 } 254 255 if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { 256 throw new DoNotRetryIOException( 257 "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " 258 + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId 259 + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); 260 } 261 262 if (oldPeerConfig.isSyncReplication()) { 263 if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { 264 throw new DoNotRetryIOException( 265 "Changing the replicated namespace/table config on a synchronous replication " 266 + "peer(peerId: " + peerId + ") is not allowed."); 267 } 268 } 269 return desc; 270 } 271 272 /** Returns the old desciption of the peer */ 273 ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId, 274 SyncReplicationState state) throws DoNotRetryIOException { 275 ReplicationPeerDescription desc = checkPeerExists(peerId); 276 SyncReplicationState fromState = desc.getSyncReplicationState(); 277 EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState); 278 if (allowedToStates == null || !allowedToStates.contains(state)) { 279 throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState 280 + " to " + state + " for peer id=" + peerId); 281 } 282 return desc; 283 } 284 285 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 286 throws ReplicationException { 287 if (peers.containsKey(peerId)) { 288 // this should be a retry, just return 289 return; 290 } 291 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 292 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 293 SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication() 294 ? SyncReplicationState.DOWNGRADE_ACTIVE 295 : SyncReplicationState.NONE; 296 peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); 297 peers.put(peerId, 298 new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); 299 } 300 301 public void removePeer(String peerId) throws ReplicationException { 302 if (!peers.containsKey(peerId)) { 303 // this should be a retry, just return 304 return; 305 } 306 peerStorage.removePeer(peerId); 307 peers.remove(peerId); 308 } 309 310 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 311 ReplicationPeerDescription desc = peers.get(peerId); 312 if (desc.isEnabled() == enabled) { 313 // this should be a retry, just return 314 return; 315 } 316 peerStorage.setPeerState(peerId, enabled); 317 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(), 318 desc.getSyncReplicationState())); 319 } 320 321 public boolean getPeerState(String peerId) throws ReplicationException { 322 ReplicationPeerDescription desc = peers.get(peerId); 323 if (desc != null) { 324 return desc.isEnabled(); 325 } else { 326 throw new ReplicationException("Replication Peer of " + peerId + " does not exist."); 327 } 328 } 329 330 public void enablePeer(String peerId) throws ReplicationException { 331 setPeerState(peerId, true); 332 } 333 334 public void disablePeer(String peerId) throws ReplicationException { 335 setPeerState(peerId, false); 336 } 337 338 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 339 throws ReplicationException { 340 // the checking rules are too complicated here so we give up checking whether this is a retry. 341 ReplicationPeerDescription desc = peers.get(peerId); 342 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 343 ReplicationPeerConfigBuilder newPeerConfigBuilder = 344 ReplicationPeerConfig.newBuilder(peerConfig); 345 // we need to use the new conf to overwrite the old one. 346 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 347 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 348 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 349 peerStorage.updatePeerConfig(peerId, newPeerConfig); 350 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig, 351 desc.getSyncReplicationState())); 352 } 353 354 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 355 if (pattern == null) { 356 return new ArrayList<>(peers.values()); 357 } 358 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 359 .collect(Collectors.toList()); 360 } 361 362 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 363 ReplicationPeerDescription desc = peers.get(peerId); 364 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 365 } 366 367 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 368 queueStorage.removeLastSequenceIds(peerId); 369 } 370 371 public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) 372 throws ReplicationException { 373 peerStorage.setPeerNewSyncReplicationState(peerId, state); 374 } 375 376 public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState) 377 throws ReplicationException { 378 if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) { 379 // Only transit if this is not a retry 380 peerStorage.transitPeerSyncReplicationState(peerId); 381 } 382 ReplicationPeerDescription desc = peers.get(peerId); 383 if (desc.getSyncReplicationState() != newState) { 384 // Only recreate the desc if this is not a retry 385 peers.put(peerId, 386 new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState)); 387 } 388 } 389 390 public void removeAllQueues(String peerId) throws ReplicationException { 391 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 392 // on-going when the refresh peer config procedure is done, if a RS which has already been 393 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 394 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 395 // source, then the queue will leave there until the another RS detects the crash and helps 396 // removing the queue. 397 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 398 // claiming, it will either under the old RS or under the new RS, and a queue can only be 399 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 400 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 401 // unless it has already been removed by others. 402 queueStorage.removeAllQueues(peerId); 403 queueStorage.removeAllQueues(peerId); 404 } 405 406 public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 407 removeAllQueues(peerId); 408 queueStorage.removePeerFromHFileRefs(peerId); 409 } 410 411 private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint) 412 throws DoNotRetryIOException { 413 if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) { 414 return; 415 } 416 // Endpoints implementing HBaseReplicationEndpoint need to check cluster key 417 URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey); 418 try { 419 if (connectionUri != null) { 420 ConnectionRegistryFactory.validate(connectionUri); 421 } else { 422 ZKConfig.validateClusterKey(clusterKey); 423 } 424 } catch (IOException e) { 425 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 426 } 427 if (endpoint != null && endpoint.canReplicateToSameCluster()) { 428 return; 429 } 430 // make sure we do not replicate to same cluster 431 String peerClusterId; 432 try { 433 if (connectionUri != null) { 434 // fetch cluster id through standard admin API 435 try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf); 436 Admin admin = conn.getAdmin()) { 437 peerClusterId = 438 admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId(); 439 } 440 } else { 441 // Create the peer cluster config for get peer cluster id 442 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); 443 try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { 444 peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); 445 } 446 } 447 } catch (IOException | KeeperException e) { 448 // we just want to check whether we will replicate to the same cluster, so if we get an error 449 // while getting the cluster id of the peer cluster, it means we are not connecting to 450 // ourselves, as we are still alive. So here we just log the error and continue 451 LOG.warn("Can't get peerClusterId for clusterKey=" + clusterKey, e); 452 return; 453 } 454 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 455 // peerClusterId value, which is the same as the source clusterId 456 if (clusterId.equals(peerClusterId)) { 457 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey 458 + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); 459 } 460 } 461 462 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 463 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 464 ReplicationEndpoint endpoint = null; 465 if (!StringUtils.isBlank(replicationEndpointImpl)) { 466 try { 467 // try creating a instance 468 endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class) 469 .getDeclaredConstructor().newInstance(); 470 } catch (Throwable e) { 471 throw new DoNotRetryIOException( 472 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 473 e); 474 } 475 } 476 checkClusterKey(peerConfig.getClusterKey(), endpoint); 477 478 if (peerConfig.replicateAllUserTables()) { 479 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 480 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 481 // cluster. 482 if ( 483 (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 484 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty()) 485 ) { 486 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 487 + "when you want replicate all cluster"); 488 } 489 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 490 peerConfig.getExcludeTableCFsMap()); 491 } else { 492 // If replicate_all flag is false, it means all user tables can't be replicated to peer 493 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 494 // cluster. 495 if ( 496 (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty()) 497 || (peerConfig.getExcludeTableCFsMap() != null 498 && !peerConfig.getExcludeTableCFsMap().isEmpty()) 499 ) { 500 throw new DoNotRetryIOException( 501 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 502 + " when replicate_all flag is false"); 503 } 504 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 505 peerConfig.getTableCFsMap()); 506 } 507 508 if (peerConfig.isSyncReplication()) { 509 checkPeerConfigForSyncReplication(peerConfig); 510 } 511 512 checkConfiguredWALEntryFilters(peerConfig); 513 } 514 515 private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) 516 throws DoNotRetryIOException { 517 // This is used to reduce the difficulty for implementing the sync replication state transition 518 // as we need to reopen all the related regions. 519 // TODO: Add namespace, replicat_all flag back 520 if (peerConfig.replicateAllUserTables()) { 521 throw new DoNotRetryIOException( 522 "Only support replicated table config for sync replication peer"); 523 } 524 if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { 525 throw new DoNotRetryIOException( 526 "Only support replicated table config for sync replication peer"); 527 } 528 if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { 529 throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); 530 } 531 for (List<String> cfs : peerConfig.getTableCFsMap().values()) { 532 if (cfs != null && !cfs.isEmpty()) { 533 throw new DoNotRetryIOException( 534 "Only support replicated table config for sync replication peer"); 535 } 536 } 537 538 Path remoteWALDir = new Path(peerConfig.getRemoteWALDir()); 539 if (!remoteWALDir.isAbsolute()) { 540 throw new DoNotRetryIOException( 541 "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute"); 542 } 543 URI remoteWALDirUri = remoteWALDir.toUri(); 544 if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) { 545 throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() 546 + " is not qualified, you must provide scheme and authority"); 547 } 548 } 549 550 private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig) 551 throws DoNotRetryIOException { 552 for (TableName tableName : peerConfig.getTableCFsMap().keySet()) { 553 for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) { 554 ReplicationPeerConfig rpc = entry.getValue().getPeerConfig(); 555 if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) { 556 throw new DoNotRetryIOException( 557 "Table " + tableName + " has been replicated by peer " + entry.getKey()); 558 } 559 } 560 } 561 } 562 563 /** 564 * Set a namespace in the peer config means that all tables in this namespace will be replicated 565 * to the peer cluster. 566 * <ol> 567 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 568 * the peer config.</li> 569 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 570 * config.</li> 571 * </ol> 572 * <p> 573 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 574 * replicated to the peer cluster. 575 * <ol> 576 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 577 * this namespace to the peer config.</li> 578 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 579 * exclude namespace.</li> 580 * </ol> 581 */ 582 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 583 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 584 if (namespaces == null || namespaces.isEmpty()) { 585 return; 586 } 587 if (tableCfs == null || tableCfs.isEmpty()) { 588 return; 589 } 590 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 591 TableName table = entry.getKey(); 592 if (namespaces.contains(table.getNamespaceAsString())) { 593 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " 594 + table.getNamespaceAsString() + " in peer config"); 595 } 596 } 597 } 598 599 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 600 throws DoNotRetryIOException { 601 String filterCSV = peerConfig.getConfiguration() 602 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 603 if (filterCSV != null && !filterCSV.isEmpty()) { 604 String[] filters = filterCSV.split(","); 605 for (String filter : filters) { 606 try { 607 Class.forName(filter).getDeclaredConstructor().newInstance(); 608 } catch (Exception e) { 609 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter 610 + " could not be created. Failing add/update peer operation.", e); 611 } 612 } 613 } 614 } 615 616 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 617 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 618 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 619 .collect(Collectors.toList()); 620 } 621 622 @RestrictedApi(explanation = "Should only be called in tests", link = "", 623 allowedOnPath = ".*/src/test/.*") 624 public ReplicationPeerStorage getPeerStorage() { 625 return peerStorage; 626 } 627 628 public ReplicationQueueStorage getQueueStorage() { 629 return queueStorage; 630 } 631 632 private static Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> 633 createReplicationQueueStorage(MasterServices services) throws IOException { 634 Configuration conf = services.getConfiguration(); 635 TableName replicationQueueTableName = 636 TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, 637 ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); 638 ReplicationQueueStorageInitializer initializer; 639 if (services.getTableDescriptors().exists(replicationQueueTableName)) { 640 // no need to create the table 641 initializer = () -> { 642 }; 643 } else { 644 // lazy create the replication table. 645 initializer = new ReplicationQueueStorageInitializer() { 646 647 private volatile boolean created = false; 648 649 @Override 650 public void initialize() throws IOException { 651 if (created) { 652 return; 653 } 654 synchronized (this) { 655 if (created) { 656 return; 657 } 658 if (services.getTableDescriptors().exists(replicationQueueTableName)) { 659 created = true; 660 return; 661 } 662 long procId = services.createSystemTable(ReplicationStorageFactory 663 .createReplicationQueueTableDescriptor(replicationQueueTableName)); 664 ProcedureExecutor<MasterProcedureEnv> procExec = services.getMasterProcedureExecutor(); 665 ProcedureSyncWait.waitFor(procExec.getEnvironment(), TimeUnit.MINUTES.toMillis(1), 666 "Creating table " + replicationQueueTableName, () -> procExec.isFinished(procId)); 667 } 668 } 669 }; 670 } 671 return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage( 672 services.getConnection(), conf, replicationQueueTableName), initializer); 673 } 674 675 public static ReplicationPeerManager create(MasterServices services, String clusterId) 676 throws ReplicationException, IOException { 677 Configuration conf = services.getConfiguration(); 678 FileSystem fs = services.getMasterFileSystem().getFileSystem(); 679 ZKWatcher zk = services.getZooKeeper(); 680 ReplicationPeerStorage peerStorage = 681 ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 682 Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> pair = 683 createReplicationQueueStorage(services); 684 ReplicationQueueStorage queueStorage = pair.getFirst(); 685 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 686 for (String peerId : peerStorage.listPeerIds()) { 687 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 688 if ( 689 ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME 690 .equals(peerConfig.getReplicationEndpointImpl()) 691 ) { 692 // If memstore region replication is enabled, there will be a special replication peer 693 // usually called 'region_replica_replication'. We do not need to load it or migrate its 694 // replication queue data since we do not rely on general replication framework for 695 // region replication in 3.x now, please see HBASE-26233 for more details. 696 // We can not delete it now since region server with old version still want to update 697 // the replicated wal position to zk, if we delete the replication queue zk node, rs 698 // will crash. See HBASE-29169 for more details. 699 // In MigrateReplicationQueueFromZkToTableProcedure, finally we will call a deleteAllData on 700 // the old replication queue storage, to make sure that we will delete the the queue data 701 // for this peer and also the peer info in replication peer storage 702 LOG.info("Found old region replica replication peer '{}', skip loading it", peerId); 703 continue; 704 } 705 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 706 peerStorage.updatePeerConfig(peerId, peerConfig); 707 boolean enabled = peerStorage.isPeerEnabled(peerId); 708 SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); 709 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); 710 } 711 return new ReplicationPeerManager(fs, zk, peerStorage, queueStorage, peers, conf, clusterId, 712 pair.getSecond()); 713 } 714 715 /** 716 * For replication peer cluster key or endpoint class, null and empty string is same. So here 717 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 718 */ 719 private boolean isStringEquals(String s1, String s2) { 720 if (StringUtils.isBlank(s1)) { 721 return StringUtils.isBlank(s2); 722 } 723 return s1.equals(s2); 724 } 725 726 @Override 727 public void onConfigurationChange(Configuration conf) { 728 this.conf = conf; 729 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 730 } 731 732 private ReplicationQueueData convert(ZkReplicationQueueData zkData) { 733 Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>(); 734 zkData.getWalOffsets().forEach((wal, offset) -> { 735 String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 736 groupOffsets.compute(walGroup, (k, oldOffset) -> { 737 if (oldOffset == null) { 738 return new ReplicationGroupOffset(wal, offset); 739 } 740 // we should record the first wal's offset 741 long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal()); 742 long walTs = AbstractFSWALProvider.getTimestamp(wal); 743 if (walTs < oldWalTs) { 744 return new ReplicationGroupOffset(wal, offset); 745 } 746 return oldOffset; 747 }); 748 }); 749 return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets)); 750 } 751 752 private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage) 753 throws Exception { 754 MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter = 755 oldQueueStorage.listAllQueues(); 756 for (;;) { 757 Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next(); 758 if (pair == null) { 759 return; 760 } 761 queueStorage.batchUpdateQueues(pair.getFirst(), 762 pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId())) 763 .map(this::convert).collect(Collectors.toList())); 764 } 765 } 766 767 private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage) 768 throws Exception { 769 MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds(); 770 for (;;) { 771 List<ZkLastPushedSeqId> list = iter.next(); 772 if (list == null) { 773 return; 774 } 775 queueStorage.batchUpdateLastSequenceIds(list.stream() 776 .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList())); 777 } 778 } 779 780 private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage) 781 throws Exception { 782 MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs(); 783 for (;;) { 784 Pair<String, List<String>> pair = iter.next(); 785 if (pair == null) { 786 return; 787 } 788 if (peers.containsKey(pair.getFirst())) { 789 queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond()); 790 } 791 } 792 } 793 794 private interface ExceptionalRunnable { 795 void run() throws Exception; 796 } 797 798 private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) { 799 CompletableFuture<?> future = new CompletableFuture<>(); 800 executor.execute(() -> { 801 try { 802 task.run(); 803 future.complete(null); 804 } catch (Exception e) { 805 future.completeExceptionally(e); 806 } 807 }); 808 return future; 809 } 810 811 // this is for upgrading from 2.x to 3.x, in 3.x we will not load the 'region_replica_replication' 812 // peer, but we still need to know whether we have it on the old storage 813 boolean hasRegionReplicaReplicationPeer() throws ReplicationException { 814 return peerStorage.listPeerIds().stream() 815 .anyMatch(p -> p.equals(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER)); 816 } 817 818 /** 819 * Submit the migration tasks to the given {@code executor}. 820 */ 821 CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) { 822 // the replication queue table creation is asynchronous and will be triggered by addPeer, so 823 // here we need to manually initialize it since we will not call addPeer. 824 try { 825 initializeQueueStorage(); 826 } catch (IOException e) { 827 return FutureUtils.failedFuture(e); 828 } 829 ZKReplicationQueueStorageForMigration oldStorage = 830 new ZKReplicationQueueStorageForMigration(zookeeper, conf); 831 return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor), 832 runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor), 833 runAsync(() -> migrateHFileRefs(oldStorage), executor)); 834 } 835 836 void deleteLegacyRegionReplicaReplicationPeer() throws ReplicationException { 837 for (String peerId : peerStorage.listPeerIds()) { 838 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 839 if ( 840 ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME 841 .equals(peerConfig.getReplicationEndpointImpl()) 842 ) { 843 LOG.info("Delete old region replica replication peer '{}'", peerId); 844 peerStorage.removePeer(peerId); 845 } 846 } 847 } 848}