001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.lang.reflect.InvocationTargetException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import java.util.TreeMap; 031import java.util.UUID; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.PriorityBlockingQueue; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.function.Predicate; 039import org.apache.commons.lang3.StringUtils; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.HBaseConfiguration; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.Server; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableDescriptors; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.regionserver.HRegionServer; 050import org.apache.hadoop.hbase.regionserver.RSRpcServices; 051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; 052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; 053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; 054import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 055import org.apache.hadoop.hbase.replication.ReplicationException; 056import org.apache.hadoop.hbase.replication.ReplicationPeer; 057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; 060import org.apache.hadoop.hbase.replication.WALEntryFilter; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.Pair; 063import org.apache.hadoop.hbase.util.Threads; 064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 065import org.apache.hadoop.hbase.wal.WAL.Entry; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 071 072/** 073 * Class that handles the source of a replication stream. Currently does not handle more than 1 074 * slave cluster. For each slave cluster it selects a random number of peers using a replication 075 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will 076 * be selected. 077 * <p> 078 * A stream is considered down when we cannot contact a region server on the peer cluster for more 079 * than 55 seconds by default. 080 * </p> 081 */ 082@InterfaceAudience.Private 083public class ReplicationSource implements ReplicationSourceInterface { 084 085 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 086 // per group queue size, keep no more than this number of logs in each wal group 087 protected int queueSizePerGroup; 088 protected ReplicationSourceLogQueue logQueue; 089 protected ReplicationQueueStorage queueStorage; 090 protected ReplicationPeer replicationPeer; 091 092 protected Configuration conf; 093 protected ReplicationQueueInfo replicationQueueInfo; 094 // id of the peer cluster this source replicates to 095 private String peerId; 096 097 // The manager of all sources to which we ping back our progress 098 protected ReplicationSourceManager manager; 099 // Should we stop everything? 100 protected Server server; 101 // How long should we sleep for each retry 102 private long sleepForRetries; 103 protected FileSystem fs; 104 // id of this cluster 105 private UUID clusterId; 106 // total number of edits we replicated 107 private AtomicLong totalReplicatedEdits = new AtomicLong(0); 108 // The znode we currently play with 109 protected String queueId; 110 // Maximum number of retries before taking bold actions 111 private int maxRetriesMultiplier; 112 // Indicates if this particular source is running 113 volatile boolean sourceRunning = false; 114 // Metrics for this source 115 private MetricsSource metrics; 116 // ReplicationEndpoint which will handle the actual replication 117 private volatile ReplicationEndpoint replicationEndpoint; 118 119 private boolean abortOnError; 120 // This is needed for the startup loop to identify when there's already 121 // an initialization happening (but not finished yet), 122 // so that it doesn't try submit another initialize thread. 123 // NOTE: this should only be set to false at the end of initialize method, prior to return. 124 private AtomicBoolean startupOngoing = new AtomicBoolean(false); 125 // Flag that signalizes uncaught error happening while starting up the source 126 // and a retry should be attempted 127 private AtomicBoolean retryStartup = new AtomicBoolean(false); 128 129 /** 130 * A filter (or a chain of filters) for WAL entries; filters out edits. 131 */ 132 protected volatile WALEntryFilter walEntryFilter; 133 134 // throttler 135 private ReplicationThrottler throttler; 136 private long defaultBandwidth; 137 private long currentBandwidth; 138 private WALFileLengthProvider walFileLengthProvider; 139 protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads = 140 new ConcurrentHashMap<>(); 141 142 public static final String WAIT_ON_ENDPOINT_SECONDS = 143 "hbase.replication.wait.on.endpoint.seconds"; 144 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; 145 private int waitOnEndpointSeconds = -1; 146 147 private Thread initThread; 148 149 /** 150 * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to 151 * skip. 152 */ 153 private final Predicate<Path> filterInWALs; 154 155 /** 156 * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we 157 * do not want replicated, passed on to replication endpoints. This is the basic set. Down in 158 * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are 159 * put after those that we pick up from the configured endpoints and other machinations to create 160 * the final {@link #walEntryFilter}. 161 * @see WALEntryFilter 162 */ 163 private final List<WALEntryFilter> baseFilterOutWALEntries; 164 165 ReplicationSource() { 166 // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables. 167 this(p -> !AbstractFSWALProvider.isMetaFile(p), 168 Lists.newArrayList(new SystemTableWALEntryFilter())); 169 } 170 171 /** 172 * @param replicateWAL Pass a filter to run against WAL Path; filter *in* WALs to 173 * Replicate; i.e. return 'true' if you want to replicate the 174 * content of the WAL. 175 * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out* 176 * WALEntries so they never make it out of this ReplicationSource. 177 */ 178 ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) { 179 this.filterInWALs = replicateWAL; 180 this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries); 181 } 182 183 /** 184 * Instantiation method used by region servers 185 * @param conf configuration to use 186 * @param fs file system to use 187 * @param manager replication manager to ping to 188 * @param server the server for this region server 189 * @param queueId the id of our replication queue 190 * @param clusterId unique UUID for the cluster 191 * @param metrics metrics for replication source 192 */ 193 @Override 194 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 195 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 196 String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 197 MetricsSource metrics) throws IOException { 198 this.server = server; 199 this.conf = HBaseConfiguration.create(conf); 200 this.waitOnEndpointSeconds = 201 this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); 202 decorateConf(); 203 // 1 second 204 this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); 205 // 5 minutes @ 1 sec per 206 this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); 207 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 208 this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this); 209 this.queueStorage = queueStorage; 210 this.replicationPeer = replicationPeer; 211 this.manager = manager; 212 this.fs = fs; 213 this.metrics = metrics; 214 this.clusterId = clusterId; 215 216 this.queueId = queueId; 217 this.replicationQueueInfo = new ReplicationQueueInfo(queueId); 218 // ReplicationQueueInfo parses the peerId out of the znode for us 219 this.peerId = this.replicationQueueInfo.getPeerId(); 220 221 // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling. 222 defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0); 223 currentBandwidth = getCurrentBandwidth(); 224 this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); 225 this.walFileLengthProvider = walFileLengthProvider; 226 227 this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); 228 229 LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, 230 replicationPeer.getId(), this.currentBandwidth); 231 } 232 233 private void decorateConf() { 234 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY); 235 if (StringUtils.isNotEmpty(replicationCodec)) { 236 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec); 237 } 238 } 239 240 @Override 241 public void enqueueLog(Path wal) { 242 if (!this.filterInWALs.test(wal)) { 243 LOG.trace("NOT replicating {}", wal); 244 return; 245 } 246 // Use WAL prefix as the WALGroupId for this peer. 247 String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); 248 boolean queueExists = logQueue.enqueueLog(wal, walPrefix); 249 250 if (!queueExists) { 251 if (this.isSourceActive() && this.walEntryFilter != null) { 252 // new wal group observed after source startup, start a new worker thread to track it 253 // notice: it's possible that wal enqueued when this.running is set but worker thread 254 // still not launched, so it's necessary to check workerThreads before start the worker 255 tryStartNewShipper(walPrefix); 256 } 257 } 258 if (LOG.isTraceEnabled()) { 259 LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, 260 this.replicationQueueInfo.getQueueId()); 261 } 262 } 263 264 @InterfaceAudience.Private 265 public Map<String, PriorityBlockingQueue<Path>> getQueues() { 266 return logQueue.getQueues(); 267 } 268 269 @Override 270 public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) 271 throws ReplicationException { 272 String peerId = replicationPeer.getId(); 273 if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) { 274 this.queueStorage.addHFileRefs(peerId, pairs); 275 metrics.incrSizeOfHFileRefsQueue(pairs.size()); 276 } else { 277 LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", 278 tableName, Bytes.toString(family), peerId); 279 } 280 } 281 282 private ReplicationEndpoint createReplicationEndpoint() 283 throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { 284 RegionServerCoprocessorHost rsServerHost = null; 285 if (server instanceof HRegionServer) { 286 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); 287 } 288 String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); 289 290 ReplicationEndpoint replicationEndpoint; 291 if (replicationEndpointImpl == null) { 292 // Default to HBase inter-cluster replication endpoint; skip reflection 293 replicationEndpoint = new HBaseInterClusterReplicationEndpoint(); 294 } else { 295 try { 296 replicationEndpoint = Class.forName(replicationEndpointImpl) 297 .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); 298 } catch (NoSuchMethodException | InvocationTargetException e) { 299 throw new IllegalArgumentException(e); 300 } 301 } 302 if (rsServerHost != null) { 303 ReplicationEndpoint newReplicationEndPoint = 304 rsServerHost.postCreateReplicationEndPoint(replicationEndpoint); 305 if (newReplicationEndPoint != null) { 306 // Override the newly created endpoint from the hook with configured end point 307 replicationEndpoint = newReplicationEndPoint; 308 } 309 } 310 return replicationEndpoint; 311 } 312 313 private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) 314 throws IOException, TimeoutException { 315 TableDescriptors tableDescriptors = null; 316 if (server instanceof HRegionServer) { 317 tableDescriptors = ((HRegionServer) server).getTableDescriptors(); 318 } 319 replicationEndpoint 320 .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, 321 clusterId, replicationPeer, metrics, tableDescriptors, server)); 322 replicationEndpoint.start(); 323 replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); 324 } 325 326 private void initializeWALEntryFilter(UUID peerClusterId) { 327 // get the WALEntryFilter from ReplicationEndpoint and add it to default filters 328 List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries); 329 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); 330 if (filterFromEndpoint != null) { 331 filters.add(filterFromEndpoint); 332 } 333 filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); 334 this.walEntryFilter = new ChainWALEntryFilter(filters); 335 } 336 337 private void tryStartNewShipper(String walGroupId) { 338 workerThreads.compute(walGroupId, (key, value) -> { 339 if (value != null) { 340 LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId); 341 return value; 342 } else { 343 LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId); 344 ReplicationSourceShipper worker = createNewShipper(walGroupId); 345 ReplicationSourceWALReader walReader = 346 createNewWALReader(walGroupId, worker.getStartPosition()); 347 Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName() 348 + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing); 349 worker.setWALReader(walReader); 350 worker.startup(this::retryRefreshing); 351 return worker; 352 } 353 }); 354 } 355 356 @Override 357 public Map<String, ReplicationStatus> getWalGroupStatus() { 358 Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>(); 359 long ageOfLastShippedOp, replicationDelay, fileSize; 360 for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) { 361 String walGroupId = walGroupShipper.getKey(); 362 ReplicationSourceShipper shipper = walGroupShipper.getValue(); 363 ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId); 364 int queueSize = logQueue.getQueueSize(walGroupId); 365 replicationDelay = metrics.getReplicationDelay(); 366 Path currentPath = shipper.getCurrentPath(); 367 fileSize = -1; 368 if (currentPath != null) { 369 try { 370 fileSize = getFileSize(currentPath); 371 } catch (IOException e) { 372 LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); 373 } 374 } else { 375 currentPath = new Path("NO_LOGS_IN_QUEUE"); 376 LOG.warn("{} No replication ongoing, waiting for new log", logPeerId()); 377 } 378 ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); 379 statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId) 380 .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition()) 381 .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp) 382 .withReplicationDelay(replicationDelay); 383 sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); 384 } 385 return sourceReplicationStatus; 386 } 387 388 private long getFileSize(Path currentPath) throws IOException { 389 long fileSize; 390 try { 391 fileSize = fs.getContentSummary(currentPath).getLength(); 392 } catch (FileNotFoundException e) { 393 Path archivedLogPath = findArchivedLog(currentPath, conf); 394 // archivedLogPath can be null if unable to locate in archiveDir. 395 if (archivedLogPath == null) { 396 throw new FileNotFoundException("Couldn't find path: " + currentPath); 397 } 398 fileSize = fs.getContentSummary(archivedLogPath).getLength(); 399 } 400 return fileSize; 401 } 402 403 protected ReplicationSourceShipper createNewShipper(String walGroupId) { 404 return new ReplicationSourceShipper(conf, walGroupId, logQueue, this); 405 } 406 407 private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) { 408 return replicationPeer.getPeerConfig().isSerial() 409 ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, 410 this, walGroupId) 411 : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this, 412 walGroupId); 413 } 414 415 /** 416 * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null. 417 * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits. 418 */ 419 WALEntryFilter getWalEntryFilter() { 420 return walEntryFilter; 421 } 422 423 // log the error, check if the error is OOME, or whether we should abort the server 424 private void checkError(Thread t, Throwable error) { 425 RSRpcServices.exitIfOOME(error); 426 LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error); 427 if (abortOnError) { 428 server.abort("Unexpected exception in " + t.getName(), error); 429 } 430 } 431 432 private void retryRefreshing(Thread t, Throwable error) { 433 checkError(t, error); 434 while (true) { 435 if (server.isAborted() || server.isStopped() || server.isStopping()) { 436 LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId()); 437 return; 438 } 439 try { 440 LOG.info("Refreshing replication sources now due to previous error on thread: {}", 441 t.getName()); 442 manager.refreshSources(getPeerId()); 443 break; 444 } catch (Exception e) { 445 LOG.error("Replication sources refresh failed.", e); 446 sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier); 447 } 448 } 449 } 450 451 @Override 452 public ReplicationEndpoint getReplicationEndpoint() { 453 return this.replicationEndpoint; 454 } 455 456 @Override 457 public ReplicationSourceManager getSourceManager() { 458 return this.manager; 459 } 460 461 @Override 462 public void tryThrottle(int batchSize) throws InterruptedException { 463 checkBandwidthChangeAndResetThrottler(); 464 if (throttler.isEnabled()) { 465 long sleepTicks = throttler.getNextSleepInterval(batchSize); 466 if (sleepTicks > 0) { 467 if (LOG.isTraceEnabled()) { 468 LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks); 469 } 470 Thread.sleep(sleepTicks); 471 // reset throttler's cycle start tick when sleep for throttling occurs 472 throttler.resetStartTick(); 473 } 474 } 475 } 476 477 private void checkBandwidthChangeAndResetThrottler() { 478 long peerBandwidth = getCurrentBandwidth(); 479 if (peerBandwidth != currentBandwidth) { 480 currentBandwidth = peerBandwidth; 481 throttler.setBandwidth((double) currentBandwidth / 10.0); 482 LOG.info("ReplicationSource : " + peerId + " bandwidth throttling changed, currentBandWidth=" 483 + currentBandwidth); 484 } 485 } 486 487 private long getCurrentBandwidth() { 488 long peerBandwidth = replicationPeer.getPeerBandwidth(); 489 // User can set peer bandwidth to 0 to use default bandwidth. 490 return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; 491 } 492 493 /** 494 * Do the sleeping logic 495 * @param msg Why we sleep 496 * @param sleepMultiplier by how many times the default sleeping time is augmented 497 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code> 498 */ 499 protected boolean sleepForRetries(String msg, int sleepMultiplier) { 500 try { 501 if (LOG.isTraceEnabled()) { 502 LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries, 503 sleepMultiplier); 504 } 505 Thread.sleep(this.sleepForRetries * sleepMultiplier); 506 } catch (InterruptedException e) { 507 if (LOG.isDebugEnabled()) { 508 LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); 509 } 510 Thread.currentThread().interrupt(); 511 } 512 return sleepMultiplier < maxRetriesMultiplier; 513 } 514 515 /** 516 * check whether the peer is enabled or not 517 * @return true if the peer is enabled, otherwise false 518 */ 519 @Override 520 public boolean isPeerEnabled() { 521 return replicationPeer.isPeerEnabled(); 522 } 523 524 private void initialize() { 525 int sleepMultiplier = 1; 526 while (this.isSourceActive()) { 527 ReplicationEndpoint replicationEndpoint; 528 try { 529 replicationEndpoint = createReplicationEndpoint(); 530 } catch (Exception e) { 531 LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e); 532 if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) { 533 sleepMultiplier++; 534 } 535 continue; 536 } 537 538 try { 539 initAndStartReplicationEndpoint(replicationEndpoint); 540 this.replicationEndpoint = replicationEndpoint; 541 break; 542 } catch (Exception e) { 543 LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e); 544 replicationEndpoint.stop(); 545 if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) { 546 sleepMultiplier++; 547 } else { 548 retryStartup.set(!this.abortOnError); 549 setSourceStartupStatus(false); 550 throw new RuntimeException("Exhausted retries to start replication endpoint."); 551 } 552 } 553 } 554 555 if (!this.isSourceActive()) { 556 // this means the server is shutting down or the source is terminated, just give up 557 // initializing 558 setSourceStartupStatus(false); 559 return; 560 } 561 562 sleepMultiplier = 1; 563 UUID peerClusterId; 564 // delay this until we are in an asynchronous thread 565 for (;;) { 566 peerClusterId = replicationEndpoint.getPeerUUID(); 567 if (this.isSourceActive() && peerClusterId == null) { 568 if (LOG.isDebugEnabled()) { 569 LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(), 570 (this.sleepForRetries * sleepMultiplier)); 571 } 572 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { 573 sleepMultiplier++; 574 } 575 } else { 576 break; 577 } 578 } 579 580 if (!this.isSourceActive()) { 581 // this means the server is shutting down or the source is terminated, just give up 582 // initializing 583 setSourceStartupStatus(false); 584 return; 585 } 586 587 LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(), 588 this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId); 589 initializeWALEntryFilter(peerClusterId); 590 // Start workers 591 for (String walGroupId : logQueue.getQueues().keySet()) { 592 tryStartNewShipper(walGroupId); 593 } 594 setSourceStartupStatus(false); 595 } 596 597 private synchronized void setSourceStartupStatus(boolean initializing) { 598 startupOngoing.set(initializing); 599 if (initializing) { 600 metrics.incrSourceInitializing(); 601 } else { 602 metrics.decrSourceInitializing(); 603 } 604 } 605 606 @Override 607 public ReplicationSourceInterface startup() { 608 if (this.sourceRunning) { 609 return this; 610 } 611 this.sourceRunning = true; 612 setSourceStartupStatus(true); 613 initThread = new Thread(this::initialize); 614 Threads.setDaemonThreadRunning(initThread, 615 Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> { 616 // if first initialization attempt failed, and abortOnError is false, we will 617 // keep looping in this thread until initialize eventually succeeds, 618 // while the server main startup one can go on with its work. 619 sourceRunning = false; 620 checkError(t, e); 621 retryStartup.set(!this.abortOnError); 622 do { 623 if (retryStartup.get()) { 624 this.sourceRunning = true; 625 setSourceStartupStatus(true); 626 retryStartup.set(false); 627 try { 628 initialize(); 629 } catch (Throwable error) { 630 setSourceStartupStatus(false); 631 checkError(t, error); 632 retryStartup.set(!this.abortOnError); 633 } 634 } 635 } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError); 636 }); 637 return this; 638 } 639 640 @Override 641 public void terminate(String reason) { 642 terminate(reason, null); 643 } 644 645 @Override 646 public void terminate(String reason, Exception cause) { 647 terminate(reason, cause, true); 648 } 649 650 @Override 651 public void terminate(String reason, Exception cause, boolean clearMetrics) { 652 terminate(reason, cause, clearMetrics, true); 653 } 654 655 public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) { 656 if (cause == null) { 657 LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason); 658 } else { 659 LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(), 660 this.queueId, reason), cause); 661 } 662 this.sourceRunning = false; 663 if (initThread != null && Thread.currentThread() != initThread) { 664 // This usually won't happen but anyway, let's wait until the initialization thread exits. 665 // And notice that we may call terminate directly from the initThread so here we need to 666 // avoid join on ourselves. 667 initThread.interrupt(); 668 Threads.shutdown(initThread, this.sleepForRetries); 669 } 670 Collection<ReplicationSourceShipper> workers = workerThreads.values(); 671 672 for (ReplicationSourceShipper worker : workers) { 673 worker.stopWorker(); 674 if (worker.entryReader != null) { 675 worker.entryReader.setReaderRunning(false); 676 } 677 } 678 679 if (this.replicationEndpoint != null) { 680 this.replicationEndpoint.stop(); 681 } 682 683 for (ReplicationSourceShipper worker : workers) { 684 if (worker.isAlive() || worker.entryReader.isAlive()) { 685 try { 686 // Wait worker to stop 687 Thread.sleep(this.sleepForRetries); 688 } catch (InterruptedException e) { 689 LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName()); 690 Thread.currentThread().interrupt(); 691 } 692 // If worker still is alive after waiting, interrupt it 693 if (worker.isAlive()) { 694 worker.interrupt(); 695 } 696 // If entry reader is alive after waiting, interrupt it 697 if (worker.entryReader.isAlive()) { 698 worker.entryReader.interrupt(); 699 } 700 } 701 if (!server.isAborted() && !server.isStopped()) { 702 // If server is running and worker is already stopped but there was still entries batched, 703 // we need to clear buffer used for non processed entries 704 worker.clearWALEntryBatch(); 705 } 706 } 707 708 if (join) { 709 for (ReplicationSourceShipper worker : workers) { 710 Threads.shutdown(worker, this.sleepForRetries); 711 LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName()); 712 } 713 if (this.replicationEndpoint != null) { 714 try { 715 this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, 716 TimeUnit.MILLISECONDS); 717 } catch (TimeoutException te) { 718 LOG.warn("{} Got exception while waiting for endpoint to shutdown " 719 + "for replication source : {}", logPeerId(), this.queueId, te); 720 } 721 } 722 } 723 724 // Can be null in test context. 725 if (this.metrics != null) { 726 if (clearMetrics) { 727 this.metrics.clear(); 728 } else { 729 this.metrics.terminate(); 730 } 731 } 732 } 733 734 @Override 735 public String getQueueId() { 736 return this.queueId; 737 } 738 739 @Override 740 public String getPeerId() { 741 return this.peerId; 742 } 743 744 @Override 745 public Path getCurrentPath() { 746 // only for testing 747 for (ReplicationSourceShipper worker : workerThreads.values()) { 748 if (worker.getCurrentPath() != null) { 749 return worker.getCurrentPath(); 750 } 751 } 752 return null; 753 } 754 755 @Override 756 public boolean isSourceActive() { 757 return !this.server.isStopped() && this.sourceRunning; 758 } 759 760 public ReplicationQueueInfo getReplicationQueueInfo() { 761 return replicationQueueInfo; 762 } 763 764 public boolean isWorkerRunning() { 765 for (ReplicationSourceShipper worker : this.workerThreads.values()) { 766 if (worker.isActive()) { 767 return worker.isActive(); 768 } 769 } 770 return false; 771 } 772 773 @Override 774 public String getStats() { 775 StringBuilder sb = new StringBuilder(); 776 sb.append("Total replicated edits: ").append(totalReplicatedEdits) 777 .append(", current progress: \n"); 778 for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) { 779 String walGroupId = entry.getKey(); 780 ReplicationSourceShipper worker = entry.getValue(); 781 long position = worker.getCurrentPosition(); 782 Path currentPath = worker.getCurrentPath(); 783 sb.append("walGroup [").append(walGroupId).append("]: "); 784 if (currentPath != null) { 785 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 786 .append(position).append("\n"); 787 } else { 788 sb.append("no replication ongoing, waiting for new log").append("\n"); 789 } 790 } 791 return sb.toString(); 792 } 793 794 @Override 795 public MetricsSource getSourceMetrics() { 796 return this.metrics; 797 } 798 799 @Override 800 // offsets totalBufferUsed by deducting shipped batchSize. 801 public void postShipEdits(List<Entry> entries, long batchSize) { 802 if (throttler.isEnabled()) { 803 throttler.addPushSize(batchSize); 804 } 805 totalReplicatedEdits.addAndGet(entries.size()); 806 this.manager.releaseBufferQuota(batchSize); 807 } 808 809 @Override 810 public WALFileLengthProvider getWALFileLengthProvider() { 811 return walFileLengthProvider; 812 } 813 814 @Override 815 public ServerName getServerWALsBelongTo() { 816 return server.getServerName(); 817 } 818 819 Server getServer() { 820 return server; 821 } 822 823 @Override 824 public ReplicationQueueStorage getReplicationQueueStorage() { 825 return queueStorage; 826 } 827 828 /** Returns String to use as a log prefix that contains current peerId. */ 829 public String logPeerId() { 830 return "peerId=" + this.getPeerId() + ","; 831 } 832 833 // Visible for testing purpose 834 public long getTotalReplicatedEdits() { 835 return totalReplicatedEdits.get(); 836 } 837}