001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import java.util.NavigableSet; 028import java.util.OptionalLong; 029import java.util.Set; 030import java.util.SortedSet; 031import java.util.TreeSet; 032import java.util.UUID; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.concurrent.ThreadLocalRandom; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicLong; 040import java.util.concurrent.atomic.AtomicReference; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 050import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 051import org.apache.hadoop.hbase.replication.ReplicationException; 052import org.apache.hadoop.hbase.replication.ReplicationPeer; 053import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 054import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; 055import org.apache.hadoop.hbase.replication.ReplicationPeers; 056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.apache.hadoop.hbase.wal.WAL.Entry; 062import org.apache.hadoop.hbase.wal.WALFactory; 063import org.apache.hadoop.hbase.wal.WALProvider; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.apache.zookeeper.KeeperException; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 070import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 071 072/** 073 * This class is responsible to manage all the replication sources. There are two classes of 074 * sources: 075 * <ul> 076 * <li>Normal sources are persistent and one per peer cluster</li> 077 * <li>Old sources are recovered from a failed region server and our only goal is to finish 078 * replicating the WAL queue it had</li> 079 * </ul> 080 * <p> 081 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock 082 * in order to transfer all the queues in a local old source. 083 * <p> 084 * Synchronization specification: 085 * <ul> 086 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there 087 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer 088 * operations.</li> 089 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it, 090 * {@link #addPeer(String)}, {@link #removePeer(String)}, 091 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}. 092 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in 093 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and 094 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is 095 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}. 096 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then 097 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only 098 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 099 * {@link #preLogRoll(Path)}.</li> 100 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which 101 * modify it, {@link #removePeer(String)}, 102 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and 103 * {@link ReplicationSourceManager#claimQueue(ServerName, String)}. 104 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by 105 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the 106 * {@link ReplicationSourceInterface} firstly, then remove the wals from 107 * {@link #walsByIdRecoveredQueues}. And 108 * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to 109 * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So 110 * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and 111 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need 112 * synchronized on {@link #walsByIdRecoveredQueues}.</li> 113 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li> 114 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the 115 * to-be-removed peer.</li> 116 * </ul> 117 */ 118@InterfaceAudience.Private 119public class ReplicationSourceManager { 120 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); 121 // all the sources that read this RS's logs and every peer only has one replication source 122 private final ConcurrentMap<String, ReplicationSourceInterface> sources; 123 // List of all the sources we got from died RSs 124 private final List<ReplicationSourceInterface> oldsources; 125 126 /** 127 * Storage for queues that need persistance; e.g. Replication state so can be recovered after a 128 * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances 129 * for these to do updates themselves. Not all ReplicationSource instances keep state. 130 */ 131 private final ReplicationQueueStorage queueStorage; 132 133 private final ReplicationPeers replicationPeers; 134 // UUID for this cluster 135 private final UUID clusterId; 136 // All about stopping 137 private final Server server; 138 139 // All logs we are currently tracking 140 // Index structure of the map is: queue_id->logPrefix/logGroup->logs 141 // For normal replication source, the peer id is same with the queue id 142 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById; 143 // Logs for recovered sources we are currently tracking 144 // the map is: queue_id->logPrefix/logGroup->logs 145 // For recovered source, the queue id's format is peer_id-servername-* 146 private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; 147 148 private final Configuration conf; 149 private final FileSystem fs; 150 // The paths to the latest log of each wal group, for new coming peers 151 private final Map<String, Path> latestPaths; 152 // Path to the wals directories 153 private final Path logDir; 154 // Path to the wal archive 155 private final Path oldLogDir; 156 private final WALFactory walFactory; 157 // The number of ms that we wait before moving znodes, HBASE-3596 158 private final long sleepBeforeFailover; 159 // Homemade executer service for replication 160 private final ThreadPoolExecutor executor; 161 162 private final boolean replicationForBulkLoadDataEnabled; 163 164 private AtomicLong totalBufferUsed = new AtomicLong(); 165 // Total buffer size on this RegionServer for holding batched edits to be shipped. 166 private final long totalBufferLimit; 167 private final MetricsReplicationGlobalSourceSource globalMetrics; 168 169 /** 170 * A special ReplicationSource for hbase:meta Region Read Replicas. Usually this reference remains 171 * empty. If an hbase:meta Region is opened on this server, we will create an instance of a 172 * hbase:meta CatalogReplicationSource and it will live the life of the Server thereafter; i.e. we 173 * will not shut it down even if the hbase:meta moves away from this server (in case it later gets 174 * moved back). We synchronize on this instance testing for presence and if absent, while creating 175 * so only created and started once. 176 */ 177 AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>(); 178 179 /** 180 * Creates a replication manager and sets the watch on all the other registered region servers 181 * @param queueStorage the interface for manipulating replication queues 182 * @param conf the configuration to use 183 * @param server the server for this region server 184 * @param fs the file system to use 185 * @param logDir the directory that contains all wal directories of live RSs 186 * @param oldLogDir the directory where old logs are archived 187 */ 188 public ReplicationSourceManager(ReplicationQueueStorage queueStorage, 189 ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs, 190 Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory, 191 MetricsReplicationGlobalSourceSource globalMetrics) throws IOException { 192 // CopyOnWriteArrayList is thread-safe. 193 // Generally, reading is more than modifying. 194 this.sources = new ConcurrentHashMap<>(); 195 this.queueStorage = queueStorage; 196 this.replicationPeers = replicationPeers; 197 this.server = server; 198 this.walsById = new ConcurrentHashMap<>(); 199 this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); 200 this.oldsources = new ArrayList<>(); 201 this.conf = conf; 202 this.fs = fs; 203 this.logDir = logDir; 204 this.oldLogDir = oldLogDir; 205 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30 206 // seconds 207 this.clusterId = clusterId; 208 this.walFactory = walFactory; 209 // It's preferable to failover 1 RS at a time, but with good zk servers 210 // more could be processed at the same time. 211 int nbWorkers = conf.getInt("replication.executor.workers", 1); 212 // use a short 100ms sleep since this could be done inline with a RS startup 213 // even if we fail, other region servers can take care of it 214 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, 215 new LinkedBlockingQueue<>()); 216 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); 217 tfb.setNameFormat("ReplicationExecutor-%d"); 218 tfb.setDaemon(true); 219 this.executor.setThreadFactory(tfb.build()); 220 this.latestPaths = new HashMap<>(); 221 replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, 222 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); 223 this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 224 HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); 225 this.globalMetrics = globalMetrics; 226 } 227 228 /** 229 * Adds a normal source per registered peer cluster. 230 */ 231 void init() throws IOException { 232 for (String id : this.replicationPeers.getAllPeerIds()) { 233 addSource(id); 234 if (replicationForBulkLoadDataEnabled) { 235 // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case 236 // when a peer was added before replication for bulk loaded data was enabled. 237 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id)); 238 } 239 } 240 } 241 242 /** 243 * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add 244 * HFile Refs 245 * @param peerId the id of replication peer 246 */ 247 public void addPeer(String peerId) throws IOException { 248 boolean added = false; 249 try { 250 added = this.replicationPeers.addPeer(peerId); 251 } catch (ReplicationException e) { 252 throw new IOException(e); 253 } 254 if (added) { 255 addSource(peerId); 256 if (replicationForBulkLoadDataEnabled) { 257 throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId)); 258 } 259 } 260 } 261 262 /** 263 * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id 264 * and related replication queues 3. Remove the normal source and related replication queue 4. 265 * Remove HFile Refs 266 * @param peerId the id of the replication peer 267 */ 268 public void removePeer(String peerId) { 269 replicationPeers.removePeer(peerId); 270 String terminateMessage = "Replication stream was removed by a user"; 271 List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>(); 272 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 273 // see NodeFailoverWorker.run 274 synchronized (this.oldsources) { 275 // First close all the recovered sources for this peer 276 for (ReplicationSourceInterface src : oldsources) { 277 if (peerId.equals(src.getPeerId())) { 278 oldSourcesToDelete.add(src); 279 } 280 } 281 for (ReplicationSourceInterface src : oldSourcesToDelete) { 282 src.terminate(terminateMessage); 283 removeRecoveredSource(src); 284 } 285 } 286 LOG 287 .info("Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size()); 288 // Now close the normal source for this peer 289 ReplicationSourceInterface srcToRemove = this.sources.get(peerId); 290 if (srcToRemove != null) { 291 srcToRemove.terminate(terminateMessage); 292 removeSource(srcToRemove); 293 } else { 294 // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup 295 // Delete queue from storage and memory and queue id is same with peer id for normal 296 // source 297 deleteQueue(peerId); 298 this.walsById.remove(peerId); 299 } 300 301 // Remove HFile Refs 302 abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId)); 303 } 304 305 /** 306 * @return a new 'classic' user-space replication source. 307 * @param queueId the id of the replication queue to associate the ReplicationSource with. 308 * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta. 309 */ 310 private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) 311 throws IOException { 312 ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); 313 // Init the just created replication source. Pass the default walProvider's wal file length 314 // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica 315 // replication, see #createCatalogReplicationSource(). 316 WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null 317 ? this.walFactory.getWALProvider().getWALFileLengthProvider() 318 : p -> OptionalLong.empty(); 319 src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, 320 walFileLengthProvider, new MetricsSource(queueId)); 321 return src; 322 } 323 324 /** 325 * Add a normal source for the given peer on this region server. Meanwhile, add new replication 326 * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal 327 * group and do replication 328 * @param peerId the id of the replication peer 329 * @return the source that was created 330 */ 331 ReplicationSourceInterface addSource(String peerId) throws IOException { 332 ReplicationPeer peer = replicationPeers.getPeer(peerId); 333 ReplicationSourceInterface src = createSource(peerId, peer); 334 // synchronized on latestPaths to avoid missing the new log 335 synchronized (this.latestPaths) { 336 this.sources.put(peerId, src); 337 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 338 this.walsById.put(peerId, walsByGroup); 339 // Add the latest wal to that source's queue 340 if (!latestPaths.isEmpty()) { 341 for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) { 342 Path walPath = walPrefixAndPath.getValue(); 343 NavigableSet<String> wals = new TreeSet<>(); 344 wals.add(walPath.getName()); 345 walsByGroup.put(walPrefixAndPath.getKey(), wals); 346 // Abort RS and throw exception to make add peer failed 347 abortAndThrowIOExceptionWhenFail( 348 () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); 349 src.enqueueLog(walPath); 350 LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); 351 } 352 } 353 } 354 src.startup(); 355 return src; 356 } 357 358 /** 359 * Close the previous replication sources of this peer id and open new sources to trigger the new 360 * replication state changes or new replication config changes. Here we don't need to change 361 * replication queue storage and only to enqueue all logs to the new replication source 362 * @param peerId the id of the replication peer 363 */ 364 public void refreshSources(String peerId) throws IOException { 365 String terminateMessage = "Peer " + peerId 366 + " state or config changed. Will close the previous replication source and open a new one"; 367 ReplicationPeer peer = replicationPeers.getPeer(peerId); 368 ReplicationSourceInterface src; 369 // synchronized on latestPaths to avoid missing the new log 370 synchronized (this.latestPaths) { 371 ReplicationSourceInterface toRemove = this.sources.remove(peerId); 372 if (toRemove != null) { 373 LOG.info("Terminate replication source for " + toRemove.getPeerId()); 374 // Do not clear metrics 375 toRemove.terminate(terminateMessage, null, false); 376 } 377 src = createSource(peerId, peer); 378 this.sources.put(peerId, src); 379 for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) { 380 walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); 381 } 382 } 383 LOG.info("Startup replication source for " + src.getPeerId()); 384 src.startup(); 385 386 List<ReplicationSourceInterface> toStartup = new ArrayList<>(); 387 // synchronized on oldsources to avoid race with NodeFailoverWorker 388 synchronized (this.oldsources) { 389 List<String> previousQueueIds = new ArrayList<>(); 390 for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter 391 .hasNext();) { 392 ReplicationSourceInterface oldSource = iter.next(); 393 if (oldSource.getPeerId().equals(peerId)) { 394 previousQueueIds.add(oldSource.getQueueId()); 395 oldSource.terminate(terminateMessage); 396 iter.remove(); 397 } 398 } 399 for (String queueId : previousQueueIds) { 400 ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer); 401 this.oldsources.add(recoveredReplicationSource); 402 for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { 403 walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); 404 } 405 toStartup.add(recoveredReplicationSource); 406 } 407 } 408 for (ReplicationSourceInterface replicationSource : toStartup) { 409 replicationSource.startup(); 410 } 411 } 412 413 /** 414 * Clear the metrics and related replication queue of the specified old source 415 * @param src source to clear 416 */ 417 void removeRecoveredSource(ReplicationSourceInterface src) { 418 LOG.info("Done with the recovered queue " + src.getQueueId()); 419 this.oldsources.remove(src); 420 // Delete queue from storage and memory 421 deleteQueue(src.getQueueId()); 422 this.walsByIdRecoveredQueues.remove(src.getQueueId()); 423 } 424 425 /** 426 * Clear the metrics and related replication queue of the specified old source 427 * @param src source to clear 428 */ 429 void removeSource(ReplicationSourceInterface src) { 430 LOG.info("Done with the queue " + src.getQueueId()); 431 this.sources.remove(src.getPeerId()); 432 // Delete queue from storage and memory 433 deleteQueue(src.getQueueId()); 434 this.walsById.remove(src.getQueueId()); 435 } 436 437 /** 438 * Delete a complete queue of wals associated with a replication source 439 * @param queueId the id of replication queue to delete 440 */ 441 private void deleteQueue(String queueId) { 442 abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId)); 443 } 444 445 @FunctionalInterface 446 private interface ReplicationQueueOperation { 447 void exec() throws ReplicationException; 448 } 449 450 /** 451 * Refresh replication source will terminate the old source first, then the source thread will be 452 * interrupted. Need to handle it instead of abort the region server. 453 */ 454 private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { 455 try { 456 op.exec(); 457 } catch (ReplicationException e) { 458 if ( 459 e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException 460 && e.getCause().getCause() != null 461 && e.getCause().getCause() instanceof InterruptedException 462 ) { 463 // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is 464 // that thread is interrupted deep down in the stack, it should pass the following 465 // processing logic and propagate to the most top layer which can handle this exception 466 // properly. In this specific case, the top layer is ReplicationSourceShipper#run(). 467 throw new ReplicationRuntimeException( 468 "Thread is interrupted, the replication source may be terminated", 469 e.getCause().getCause()); 470 } 471 server.abort("Failed to operate on replication queue", e); 472 } 473 } 474 475 private void abortWhenFail(ReplicationQueueOperation op) { 476 try { 477 op.exec(); 478 } catch (ReplicationException e) { 479 server.abort("Failed to operate on replication queue", e); 480 } 481 } 482 483 private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 484 try { 485 op.exec(); 486 } catch (ReplicationException e) { 487 throw new IOException(e); 488 } 489 } 490 491 private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException { 492 try { 493 op.exec(); 494 } catch (ReplicationException e) { 495 server.abort("Failed to operate on replication queue", e); 496 throw new IOException(e); 497 } 498 } 499 500 /** 501 * This method will log the current position to storage. And also clean old logs from the 502 * replication queue. 503 * @param entryBatch the wal entry batch we just shipped 504 */ 505 public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, 506 WALEntryBatch entryBatch) { 507 String fileName = entryBatch.getLastWalPath().getName(); 508 String queueId = source.getQueueId(); 509 interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, 510 fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); 511 cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered()); 512 } 513 514 /** 515 * Cleans a log file and all older logs from replication queue. Called when we are sure that a log 516 * file is closed and has no more entries. 517 * @param log Path to the log 518 * @param inclusive whether we should also remove the given log file 519 * @param queueId id of the replication queue 520 * @param queueRecovered Whether this is a recovered queue 521 */ 522 void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) { 523 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); 524 if (queueRecovered) { 525 NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix); 526 if (wals != null) { 527 cleanOldLogs(wals, log, inclusive, queueId); 528 } 529 } else { 530 // synchronized on walsById to avoid race with preLogRoll 531 synchronized (this.walsById) { 532 NavigableSet<String> wals = walsById.get(queueId).get(logPrefix); 533 if (wals != null) { 534 cleanOldLogs(wals, log, inclusive, queueId); 535 } 536 } 537 } 538 } 539 540 private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) { 541 NavigableSet<String> walSet = wals.headSet(key, inclusive); 542 if (walSet.isEmpty()) { 543 return; 544 } 545 LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); 546 for (String wal : walSet) { 547 interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); 548 } 549 walSet.clear(); 550 } 551 552 // public because of we call it in TestReplicationEmptyWALRecovery 553 public void preLogRoll(Path newLog) throws IOException { 554 String logName = newLog.getName(); 555 String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); 556 // synchronized on latestPaths to avoid the new open source miss the new log 557 synchronized (this.latestPaths) { 558 // Add log to queue storage 559 for (ReplicationSourceInterface source : this.sources.values()) { 560 // If record log to queue storage failed, abort RS and throw exception to make log roll 561 // failed 562 abortAndThrowIOExceptionWhenFail( 563 () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName)); 564 } 565 566 // synchronized on walsById to avoid race with cleanOldLogs 567 synchronized (this.walsById) { 568 // Update walsById map 569 for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById 570 .entrySet()) { 571 String peerId = entry.getKey(); 572 Map<String, NavigableSet<String>> walsByPrefix = entry.getValue(); 573 boolean existingPrefix = false; 574 for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) { 575 SortedSet<String> wals = walsEntry.getValue(); 576 if (this.sources.isEmpty()) { 577 // If there's no slaves, don't need to keep the old wals since 578 // we only consider the last one when a new slave comes in 579 wals.clear(); 580 } 581 if (logPrefix.equals(walsEntry.getKey())) { 582 wals.add(logName); 583 existingPrefix = true; 584 } 585 } 586 if (!existingPrefix) { 587 // The new log belongs to a new group, add it into this peer 588 LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId); 589 NavigableSet<String> wals = new TreeSet<>(); 590 wals.add(logName); 591 walsByPrefix.put(logPrefix, wals); 592 } 593 } 594 } 595 596 // Add to latestPaths 597 latestPaths.put(logPrefix, newLog); 598 } 599 } 600 601 // public because of we call it in TestReplicationEmptyWALRecovery 602 public void postLogRoll(Path newLog) throws IOException { 603 // This only updates the sources we own, not the recovered ones 604 for (ReplicationSourceInterface source : this.sources.values()) { 605 source.enqueueLog(newLog); 606 LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog, 607 source.getQueueId()); 608 } 609 } 610 611 void claimQueue(ServerName deadRS, String queue) { 612 // Wait a bit before transferring the queues, we may be shutting down. 613 // This sleep may not be enough in some cases. 614 try { 615 Thread.sleep(sleepBeforeFailover 616 + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover)); 617 } catch (InterruptedException e) { 618 LOG.warn("Interrupted while waiting before transferring a queue."); 619 Thread.currentThread().interrupt(); 620 } 621 // We try to lock that rs' queue directory 622 if (server.isStopped()) { 623 LOG.info("Not transferring queue since we are shutting down"); 624 return; 625 } 626 // After claim the queues from dead region server, wewill skip to start the 627 // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a 628 // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get 629 // a copy of the replication peer first to decide whether we should start the 630 // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to 631 // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475). 632 String peerId = new ReplicationQueueInfo(queue).getPeerId(); 633 ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId); 634 if (oldPeer == null) { 635 LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", 636 peerId, queue); 637 return; 638 } 639 Pair<String, SortedSet<String>> claimedQueue; 640 try { 641 claimedQueue = queueStorage.claimQueue(deadRS, queue, server.getServerName()); 642 } catch (ReplicationException e) { 643 LOG.error( 644 "ReplicationException: cannot claim dead region ({})'s " + "replication queue. Znode : ({})" 645 + " Possible solution: check if znode size exceeds jute.maxBuffer value. " 646 + " If so, increase it for both client and server side.", 647 deadRS, queueStorage.getRsNode(deadRS), e); 648 server.abort("Failed to claim queue from dead regionserver.", e); 649 return; 650 } 651 if (claimedQueue.getSecond().isEmpty()) { 652 return; 653 } 654 String queueId = claimedQueue.getFirst(); 655 Set<String> walsSet = claimedQueue.getSecond(); 656 ReplicationPeerImpl peer = replicationPeers.getPeer(peerId); 657 if (peer == null || peer != oldPeer) { 658 LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, deadRS); 659 abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId)); 660 return; 661 } 662 if ( 663 server instanceof ReplicationSyncUp.DummyServer 664 && peer.getPeerState().equals(PeerState.DISABLED) 665 ) { 666 LOG.warn( 667 "Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.", 668 peerId); 669 return; 670 } 671 672 ReplicationSourceInterface src; 673 try { 674 src = createSource(queueId, peer); 675 } catch (IOException e) { 676 LOG.error("Can not create replication source for peer {} and queue {}", peerId, queueId, e); 677 server.abort("Failed to create replication source after claiming queue.", e); 678 return; 679 } 680 // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer 681 synchronized (oldsources) { 682 peer = replicationPeers.getPeer(src.getPeerId()); 683 if (peer == null || peer != oldPeer) { 684 src.terminate("Recovered queue doesn't belong to any current peer"); 685 deleteQueue(queueId); 686 return; 687 } 688 // track sources in walsByIdRecoveredQueues 689 Map<String, NavigableSet<String>> walsByGroup = new HashMap<>(); 690 walsByIdRecoveredQueues.put(queueId, walsByGroup); 691 for (String wal : walsSet) { 692 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal); 693 NavigableSet<String> wals = walsByGroup.get(walPrefix); 694 if (wals == null) { 695 wals = new TreeSet<>(); 696 walsByGroup.put(walPrefix, wals); 697 } 698 wals.add(wal); 699 } 700 oldsources.add(src); 701 LOG.info("Added source for recovered queue {}", src.getQueueId()); 702 for (String wal : walsSet) { 703 LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); 704 src.enqueueLog(new Path(oldLogDir, wal)); 705 } 706 src.startup(); 707 } 708 } 709 710 /** 711 * Terminate the replication on this region server 712 */ 713 public void join() { 714 this.executor.shutdown(); 715 for (ReplicationSourceInterface source : this.sources.values()) { 716 source.terminate("Region server is closing"); 717 } 718 synchronized (oldsources) { 719 for (ReplicationSourceInterface source : this.oldsources) { 720 source.terminate("Region server is closing"); 721 } 722 } 723 } 724 725 /** 726 * Get a copy of the wals of the normal sources on this rs 727 * @return a sorted set of wal names 728 */ 729 public Map<String, Map<String, NavigableSet<String>>> getWALs() { 730 return Collections.unmodifiableMap(walsById); 731 } 732 733 /** 734 * Get a copy of the wals of the recovered sources on this rs 735 * @return a sorted set of wal names 736 */ 737 Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() { 738 return Collections.unmodifiableMap(walsByIdRecoveredQueues); 739 } 740 741 /** 742 * Get a list of all the normal sources of this rs 743 * @return list of all normal sources 744 */ 745 public List<ReplicationSourceInterface> getSources() { 746 return new ArrayList<>(this.sources.values()); 747 } 748 749 /** 750 * Get a list of all the recovered sources of this rs 751 * @return list of all recovered sources 752 */ 753 public List<ReplicationSourceInterface> getOldSources() { 754 return this.oldsources; 755 } 756 757 /** 758 * Get the normal source for a given peer 759 * @return the normal source for the give peer if it exists, otherwise null. 760 */ 761 public ReplicationSourceInterface getSource(String peerId) { 762 return this.sources.get(peerId); 763 } 764 765 List<String> getAllQueues() throws IOException { 766 List<String> allQueues = Collections.emptyList(); 767 try { 768 allQueues = queueStorage.getAllQueues(server.getServerName()); 769 } catch (ReplicationException e) { 770 throw new IOException(e); 771 } 772 return allQueues; 773 } 774 775 int getSizeOfLatestPath() { 776 synchronized (latestPaths) { 777 return latestPaths.size(); 778 } 779 } 780 781 Set<Path> getLastestPath() { 782 synchronized (latestPaths) { 783 return Sets.newHashSet(latestPaths.values()); 784 } 785 } 786 787 public long getTotalBufferUsed() { 788 return totalBufferUsed.get(); 789 } 790 791 /** 792 * Returns the maximum size in bytes of edits held in memory which are pending replication across 793 * all sources inside this RegionServer. 794 */ 795 public long getTotalBufferLimit() { 796 return totalBufferLimit; 797 } 798 799 /** 800 * Get the directory where wals are archived 801 * @return the directory where wals are archived 802 */ 803 public Path getOldLogDir() { 804 return this.oldLogDir; 805 } 806 807 /** 808 * Get the directory where wals are stored by their RSs 809 * @return the directory where wals are stored by their RSs 810 */ 811 public Path getLogDir() { 812 return this.logDir; 813 } 814 815 /** 816 * Get the handle on the local file system 817 * @return Handle on the local file system 818 */ 819 public FileSystem getFs() { 820 return this.fs; 821 } 822 823 /** 824 * Get the ReplicationPeers used by this ReplicationSourceManager 825 * @return the ReplicationPeers used by this ReplicationSourceManager 826 */ 827 public ReplicationPeers getReplicationPeers() { 828 return this.replicationPeers; 829 } 830 831 /** 832 * Get a string representation of all the sources' metrics 833 */ 834 public String getStats() { 835 StringBuilder stats = new StringBuilder(); 836 // Print stats that apply across all Replication Sources 837 stats.append("Global stats: "); 838 stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=") 839 .append(getTotalBufferLimit()).append("B\n"); 840 for (ReplicationSourceInterface source : this.sources.values()) { 841 stats.append("Normal source for cluster " + source.getPeerId() + ": "); 842 stats.append(source.getStats() + "\n"); 843 } 844 for (ReplicationSourceInterface oldSource : oldsources) { 845 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": "); 846 stats.append(oldSource.getStats() + "\n"); 847 } 848 return stats.toString(); 849 } 850 851 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 852 throws IOException { 853 for (ReplicationSourceInterface source : this.sources.values()) { 854 throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); 855 } 856 } 857 858 public void cleanUpHFileRefs(String peerId, List<String> files) { 859 interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); 860 } 861 862 int activeFailoverTaskCount() { 863 return executor.getActiveCount(); 864 } 865 866 MetricsReplicationGlobalSourceSource getGlobalMetrics() { 867 return this.globalMetrics; 868 } 869 870 /** 871 * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. Create it 872 * once only. If exists already, use the existing one. 873 * @see #removeCatalogReplicationSource(RegionInfo) 874 * @see #addSource(String) This is specialization on the addSource method. 875 */ 876 public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo) 877 throws IOException { 878 // Poor-man's putIfAbsent 879 synchronized (this.catalogReplicationSource) { 880 ReplicationSourceInterface rs = this.catalogReplicationSource.get(); 881 return rs != null 882 ? rs 883 : this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo)); 884 } 885 } 886 887 /** 888 * Remove the hbase:meta Catalog replication source. Called when we close hbase:meta. 889 * @see #addCatalogReplicationSource(RegionInfo regionInfo) 890 */ 891 public void removeCatalogReplicationSource(RegionInfo regionInfo) { 892 // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region 893 // comes back to this server. 894 } 895 896 /** 897 * Create, initialize, and start the Catalog ReplicationSource. Presumes called one-time only 898 * (caller must ensure one-time only call). This ReplicationSource is NOT created via 899 * {@link ReplicationSourceFactory}. 900 * @see #addSource(String) This is a specialization of the addSource call. 901 * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on 902 * why the special handling). 903 */ 904 private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo) 905 throws IOException { 906 // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the 907 // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider. 908 WALProvider walProvider = this.walFactory.getMetaWALProvider(); 909 boolean instantiate = walProvider == null; 910 if (instantiate) { 911 walProvider = this.walFactory.getMetaProvider(); 912 } 913 // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need 914 // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog 915 // read replicas feature that makes use of the source does a reset on a crash of the WAL 916 // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the 917 // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail. 918 CatalogReplicationSourcePeer peer = 919 new CatalogReplicationSourcePeer(this.conf, this.clusterId.toString()); 920 final ReplicationSourceInterface crs = new CatalogReplicationSource(); 921 crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(), 922 clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId())); 923 // Add listener on the provider so we can pick up the WAL to replicate on roll. 924 WALActionsListener listener = new WALActionsListener() { 925 @Override 926 public void postLogRoll(Path oldPath, Path newPath) throws IOException { 927 crs.enqueueLog(newPath); 928 } 929 }; 930 walProvider.addWALActionsListener(listener); 931 if (!instantiate) { 932 // If we did not instantiate provider, need to add our listener on already-created WAL 933 // instance too (listeners are passed by provider to WAL instance on creation but if provider 934 // created already, our listener add above is missed). And add the current WAL file to the 935 // Replication Source so it can start replicating it. 936 WAL wal = walProvider.getWAL(regionInfo); 937 wal.registerWALActionsListener(listener); 938 crs.enqueueLog(((AbstractFSWAL) wal).getCurrentFileName()); 939 } 940 return crs.startup(); 941 } 942 943 ReplicationQueueStorage getQueueStorage() { 944 return queueStorage; 945 } 946 947 /** 948 * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}. 949 * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer 950 * quota. 951 * @return true if we should clear buffer and push all 952 */ 953 boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) { 954 long entrySize = walEntryBatch.incrementUsedBufferSize(entry); 955 return this.acquireBufferQuota(entrySize); 956 } 957 958 /** 959 * To release the buffer quota of {@link WALEntryBatch} which acquired by 960 * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}. 961 * @return the released buffer quota size. 962 */ 963 long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) { 964 long usedBufferSize = walEntryBatch.getUsedBufferSize(); 965 if (usedBufferSize > 0) { 966 this.releaseBufferQuota(usedBufferSize); 967 } 968 return usedBufferSize; 969 } 970 971 /** 972 * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds 973 * {@link ReplicationSourceManager#totalBufferLimit}. 974 * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds 975 * {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and 976 * ship all. 977 */ 978 boolean acquireBufferQuota(long size) { 979 if (size < 0) { 980 throw new IllegalArgumentException("size should not less than 0"); 981 } 982 long newBufferUsed = addTotalBufferUsed(size); 983 return newBufferUsed >= totalBufferLimit; 984 } 985 986 /** 987 * To release the buffer quota which acquired by 988 * {@link ReplicationSourceManager#acquireBufferQuota}. 989 */ 990 void releaseBufferQuota(long size) { 991 if (size < 0) { 992 throw new IllegalArgumentException("size should not less than 0"); 993 } 994 addTotalBufferUsed(-size); 995 } 996 997 private long addTotalBufferUsed(long size) { 998 if (size == 0) { 999 return totalBufferUsed.get(); 1000 } 1001 long newBufferUsed = totalBufferUsed.addAndGet(size); 1002 // Record the new buffer usage 1003 this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed); 1004 return newBufferUsed; 1005 } 1006 1007 /** 1008 * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds 1009 * {@link ReplicationSourceManager#totalBufferLimit} for peer. 1010 * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than 1011 * {@link ReplicationSourceManager#totalBufferLimit}. 1012 */ 1013 boolean checkBufferQuota(String peerId) { 1014 // try not to go over total quota 1015 if (totalBufferUsed.get() > totalBufferLimit) { 1016 LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", 1017 peerId, totalBufferUsed.get(), totalBufferLimit); 1018 return false; 1019 } 1020 return true; 1021 } 1022}