001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.regex.Pattern; 030import java.util.stream.Collectors; 031import org.apache.commons.lang3.StringUtils; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 040import org.apache.hadoop.hbase.conf.ConfigurationObserver; 041import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 042import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 043import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 044import org.apache.hadoop.hbase.replication.ReplicationException; 045import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 048import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 049import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 052import org.apache.hadoop.hbase.replication.ReplicationUtils; 053import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 054import org.apache.hadoop.hbase.zookeeper.ZKConfig; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.apache.zookeeper.KeeperException; 058 059/** 060 * Manages and performs all replication admin operations. 061 * <p> 062 * Used to add/remove a replication peer. 063 * <p> 064 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for 065 * supporting migrating across different replication peer storages without restarting master. 066 */ 067@InterfaceAudience.Private 068public class ReplicationPeerManager implements ConfigurationObserver { 069 070 private volatile ReplicationPeerStorage peerStorage; 071 072 private final ReplicationQueueStorage queueStorage; 073 074 private final ConcurrentMap<String, ReplicationPeerDescription> peers; 075 076 private final String clusterId; 077 078 private volatile Configuration conf; 079 080 // for dynamic recreating ReplicationPeerStorage. 081 private final FileSystem fs; 082 083 private final ZKWatcher zk; 084 085 ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage, 086 ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers, 087 Configuration conf, String clusterId) { 088 this.fs = fs; 089 this.zk = zk; 090 this.peerStorage = peerStorage; 091 this.queueStorage = queueStorage; 092 this.peers = peers; 093 this.conf = conf; 094 this.clusterId = clusterId; 095 } 096 097 private void checkQueuesDeleted(String peerId) 098 throws ReplicationException, DoNotRetryIOException { 099 for (ServerName replicator : queueStorage.getListOfReplicators()) { 100 List<String> queueIds = queueStorage.getAllQueues(replicator); 101 for (String queueId : queueIds) { 102 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 103 if (queueInfo.getPeerId().equals(peerId)) { 104 throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: " 105 + replicator + ", queueId: " + queueId); 106 } 107 } 108 } 109 if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { 110 throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); 111 } 112 } 113 114 void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) 115 throws DoNotRetryIOException, ReplicationException { 116 if (peerId.contains("-")) { 117 throw new DoNotRetryIOException("Found invalid peer name: " + peerId); 118 } 119 checkPeerConfig(peerConfig); 120 if (peers.containsKey(peerId)) { 121 throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); 122 } 123 // make sure that there is no queues with the same peer id. This may happen when we create a 124 // peer with the same id with a old deleted peer. If the replication queues for the old peer 125 // have not been cleaned up yet then we should not create the new peer, otherwise the old wal 126 // file may also be replicated. 127 checkQueuesDeleted(peerId); 128 } 129 130 private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { 131 ReplicationPeerDescription desc = peers.get(peerId); 132 if (desc == null) { 133 throw new ReplicationPeerNotFoundException(peerId); 134 } 135 return desc; 136 } 137 138 ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { 139 return checkPeerExists(peerId).getPeerConfig(); 140 } 141 142 void preEnablePeer(String peerId) throws DoNotRetryIOException { 143 ReplicationPeerDescription desc = checkPeerExists(peerId); 144 if (desc.isEnabled()) { 145 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); 146 } 147 } 148 149 void preDisablePeer(String peerId) throws DoNotRetryIOException { 150 ReplicationPeerDescription desc = checkPeerExists(peerId); 151 if (!desc.isEnabled()) { 152 throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); 153 } 154 } 155 156 /** 157 * Return the old peer description. Can never be null. 158 */ 159 ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 160 throws DoNotRetryIOException { 161 checkPeerConfig(peerConfig); 162 ReplicationPeerDescription desc = checkPeerExists(peerId); 163 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 164 if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { 165 throw new DoNotRetryIOException( 166 "Changing the cluster key on an existing peer is not allowed. Existing key '" 167 + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" 168 + peerConfig.getClusterKey() + "'"); 169 } 170 171 if ( 172 !isStringEquals(peerConfig.getReplicationEndpointImpl(), 173 oldPeerConfig.getReplicationEndpointImpl()) 174 ) { 175 throw new DoNotRetryIOException("Changing the replication endpoint implementation class " 176 + "on an existing peer is not allowed. Existing class '" 177 + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId 178 + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); 179 } 180 return desc; 181 } 182 183 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) 184 throws ReplicationException { 185 if (peers.containsKey(peerId)) { 186 // this should be a retry, just return 187 return; 188 } 189 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 190 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); 191 peerStorage.addPeer(peerId, copiedPeerConfig, enabled); 192 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); 193 } 194 195 public void removePeer(String peerId) throws ReplicationException { 196 if (!peers.containsKey(peerId)) { 197 // this should be a retry, just return 198 return; 199 } 200 peerStorage.removePeer(peerId); 201 peers.remove(peerId); 202 } 203 204 private void setPeerState(String peerId, boolean enabled) throws ReplicationException { 205 ReplicationPeerDescription desc = peers.get(peerId); 206 if (desc.isEnabled() == enabled) { 207 // this should be a retry, just return 208 return; 209 } 210 peerStorage.setPeerState(peerId, enabled); 211 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); 212 } 213 214 public boolean getPeerState(String peerId) throws ReplicationException { 215 ReplicationPeerDescription desc = peers.get(peerId); 216 if (desc != null) { 217 return desc.isEnabled(); 218 } else { 219 throw new ReplicationException("Replication Peer of " + peerId + " does not exist."); 220 } 221 } 222 223 public void enablePeer(String peerId) throws ReplicationException { 224 setPeerState(peerId, true); 225 } 226 227 public void disablePeer(String peerId) throws ReplicationException { 228 setPeerState(peerId, false); 229 } 230 231 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 232 throws ReplicationException { 233 // the checking rules are too complicated here so we give up checking whether this is a retry. 234 ReplicationPeerDescription desc = peers.get(peerId); 235 ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); 236 ReplicationPeerConfigBuilder newPeerConfigBuilder = 237 ReplicationPeerConfig.newBuilder(peerConfig); 238 // we need to use the new conf to overwrite the old one. 239 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 240 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 241 newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); 242 newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); 243 ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); 244 peerStorage.updatePeerConfig(peerId, newPeerConfig); 245 peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); 246 } 247 248 public List<ReplicationPeerDescription> listPeers(Pattern pattern) { 249 if (pattern == null) { 250 return new ArrayList<>(peers.values()); 251 } 252 return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) 253 .collect(Collectors.toList()); 254 } 255 256 public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { 257 ReplicationPeerDescription desc = peers.get(peerId); 258 return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); 259 } 260 261 void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { 262 queueStorage.removeLastSequenceIds(peerId); 263 } 264 265 void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { 266 // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still 267 // on-going when the refresh peer config procedure is done, if a RS which has already been 268 // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in 269 // the scan here, and if the RS who has claimed the queue crashed before creating recovered 270 // source, then the queue will leave there until the another RS detects the crash and helps 271 // removing the queue. 272 // A two pass scan can solve the problem. Anyway, the queue will not disappear during the 273 // claiming, it will either under the old RS or under the new RS, and a queue can only be 274 // claimed once after the refresh peer procedure done(as the next claim queue will just delete 275 // it), so we can make sure that a two pass scan will finally find the queue and remove it, 276 // unless it has already been removed by others. 277 ReplicationUtils.removeAllQueues(queueStorage, peerId); 278 ReplicationUtils.removeAllQueues(queueStorage, peerId); 279 queueStorage.removePeerFromHFileRefs(peerId); 280 } 281 282 private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { 283 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); 284 ReplicationEndpoint endpoint = null; 285 if (!StringUtils.isBlank(replicationEndpointImpl)) { 286 try { 287 // try creating a instance 288 endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class) 289 .getDeclaredConstructor().newInstance(); 290 } catch (Throwable e) { 291 throw new DoNotRetryIOException( 292 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, 293 e); 294 } 295 } 296 // Endpoints implementing HBaseReplicationEndpoint need to check cluster key 297 if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) { 298 checkClusterKey(peerConfig.getClusterKey()); 299 // Check if endpoint can replicate to the same cluster 300 if (endpoint == null || !endpoint.canReplicateToSameCluster()) { 301 checkSameClusterKey(peerConfig.getClusterKey()); 302 } 303 } 304 305 if (peerConfig.replicateAllUserTables()) { 306 // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. 307 // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer 308 // cluster. 309 if ( 310 (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 311 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty()) 312 ) { 313 throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " 314 + "when you want replicate all cluster"); 315 } 316 checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), 317 peerConfig.getExcludeTableCFsMap()); 318 } else { 319 // If replicate_all flag is false, it means all user tables can't be replicated to peer 320 // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer 321 // cluster. 322 if ( 323 (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty()) 324 || (peerConfig.getExcludeTableCFsMap() != null 325 && !peerConfig.getExcludeTableCFsMap().isEmpty()) 326 ) { 327 throw new DoNotRetryIOException( 328 "Need clean exclude-namespaces or exclude-table-cfs config firstly" 329 + " when replicate_all flag is false"); 330 } 331 checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), 332 peerConfig.getTableCFsMap()); 333 } 334 335 checkConfiguredWALEntryFilters(peerConfig); 336 } 337 338 /** 339 * Set a namespace in the peer config means that all tables in this namespace will be replicated 340 * to the peer cluster. 341 * <ol> 342 * <li>If peer config already has a namespace, then not allow set any table of this namespace to 343 * the peer config.</li> 344 * <li>If peer config already has a table, then not allow set this table's namespace to the peer 345 * config.</li> 346 * </ol> 347 * <p> 348 * Set a exclude namespace in the peer config means that all tables in this namespace can't be 349 * replicated to the peer cluster. 350 * <ol> 351 * <li>If peer config already has a exclude namespace, then not allow set any exclude table of 352 * this namespace to the peer config.</li> 353 * <li>If peer config already has a exclude table, then not allow set this table's namespace as a 354 * exclude namespace.</li> 355 * </ol> 356 */ 357 private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, 358 Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { 359 if (namespaces == null || namespaces.isEmpty()) { 360 return; 361 } 362 if (tableCfs == null || tableCfs.isEmpty()) { 363 return; 364 } 365 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { 366 TableName table = entry.getKey(); 367 if (namespaces.contains(table.getNamespaceAsString())) { 368 throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " 369 + table.getNamespaceAsString() + " in peer config"); 370 } 371 } 372 } 373 374 private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) 375 throws DoNotRetryIOException { 376 String filterCSV = peerConfig.getConfiguration() 377 .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); 378 if (filterCSV != null && !filterCSV.isEmpty()) { 379 String[] filters = filterCSV.split(","); 380 for (String filter : filters) { 381 try { 382 Class.forName(filter).getDeclaredConstructor().newInstance(); 383 } catch (Exception e) { 384 throw new DoNotRetryIOException("Configured WALEntryFilter " + filter 385 + " could not be created. Failing add/update peer operation.", e); 386 } 387 } 388 } 389 } 390 391 private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { 392 try { 393 ZKConfig.validateClusterKey(clusterKey); 394 } catch (IOException e) { 395 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); 396 } 397 } 398 399 private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException { 400 String peerClusterId = ""; 401 try { 402 // Create the peer cluster config for get peer cluster id 403 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); 404 try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { 405 peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); 406 } 407 } catch (IOException | KeeperException e) { 408 throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e); 409 } 410 // In rare case, zookeeper setting may be messed up. That leads to the incorrect 411 // peerClusterId value, which is the same as the source clusterId 412 if (clusterId.equals(peerClusterId)) { 413 throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey 414 + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint"); 415 } 416 } 417 418 public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { 419 return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) 420 .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) 421 .collect(Collectors.toList()); 422 } 423 424 public ReplicationQueueStorage getQueueStorage() { 425 return queueStorage; 426 } 427 428 public static ReplicationPeerManager create(FileSystem fs, ZKWatcher zk, Configuration conf, 429 String clusterId) throws ReplicationException { 430 ReplicationPeerStorage peerStorage = 431 ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 432 ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); 433 for (String peerId : peerStorage.listPeerIds()) { 434 ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); 435 436 peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig); 437 peerStorage.updatePeerConfig(peerId, peerConfig); 438 boolean enabled = peerStorage.isPeerEnabled(peerId); 439 peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); 440 } 441 return new ReplicationPeerManager(fs, zk, peerStorage, 442 ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId); 443 } 444 445 /** 446 * For replication peer cluster key or endpoint class, null and empty string is same. So here 447 * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. 448 */ 449 private boolean isStringEquals(String s1, String s2) { 450 if (StringUtils.isBlank(s1)) { 451 return StringUtils.isBlank(s2); 452 } 453 return s1.equals(s2); 454 } 455 456 @Override 457 public void onConfigurationChange(Configuration conf) { 458 this.conf = conf; 459 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 460 } 461}