001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.findArchivedLog;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import java.util.TreeMap;
031import java.util.UUID;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.PriorityBlockingQueue;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.function.Predicate;
039import org.apache.commons.lang3.StringUtils;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableDescriptors;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.regionserver.HRegionServer;
050import org.apache.hadoop.hbase.regionserver.RSRpcServices;
051import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
052import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
053import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
054import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationException;
056import org.apache.hadoop.hbase.replication.ReplicationPeer;
057import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
058import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
059import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
060import org.apache.hadoop.hbase.replication.WALEntryFilter;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.util.Threads;
064import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
065import org.apache.hadoop.hbase.wal.WAL.Entry;
066import org.apache.yetus.audience.InterfaceAudience;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
071
072/**
073 * Class that handles the source of a replication stream. Currently does not handle more than 1
074 * slave cluster. For each slave cluster it selects a random number of peers using a replication
075 * ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
076 * be selected.
077 * <p>
078 * A stream is considered down when we cannot contact a region server on the peer cluster for more
079 * than 55 seconds by default.
080 * </p>
081 */
082@InterfaceAudience.Private
083public class ReplicationSource implements ReplicationSourceInterface {
084
085  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
086  // per group queue size, keep no more than this number of logs in each wal group
087  protected int queueSizePerGroup;
088  protected ReplicationSourceLogQueue logQueue;
089  protected ReplicationQueueStorage queueStorage;
090  protected ReplicationPeer replicationPeer;
091
092  protected Configuration conf;
093  protected ReplicationQueueInfo replicationQueueInfo;
094  // id of the peer cluster this source replicates to
095  private String peerId;
096
097  // The manager of all sources to which we ping back our progress
098  protected ReplicationSourceManager manager;
099  // Should we stop everything?
100  protected Server server;
101  // How long should we sleep for each retry
102  private long sleepForRetries;
103  protected FileSystem fs;
104  // id of this cluster
105  private UUID clusterId;
106  // total number of edits we replicated
107  private AtomicLong totalReplicatedEdits = new AtomicLong(0);
108  // The znode we currently play with
109  protected String queueId;
110  // Maximum number of retries before taking bold actions
111  private int maxRetriesMultiplier;
112  // Indicates if this particular source is running
113  volatile boolean sourceRunning = false;
114  // Metrics for this source
115  private MetricsSource metrics;
116  // ReplicationEndpoint which will handle the actual replication
117  private volatile ReplicationEndpoint replicationEndpoint;
118
119  private boolean abortOnError;
120  // This is needed for the startup loop to identify when there's already
121  // an initialization happening (but not finished yet),
122  // so that it doesn't try submit another initialize thread.
123  // NOTE: this should only be set to false at the end of initialize method, prior to return.
124  private AtomicBoolean startupOngoing = new AtomicBoolean(false);
125  // Flag that signalizes uncaught error happening while starting up the source
126  // and a retry should be attempted
127  private AtomicBoolean retryStartup = new AtomicBoolean(false);
128
129  /**
130   * A filter (or a chain of filters) for WAL entries; filters out edits.
131   */
132  protected volatile WALEntryFilter walEntryFilter;
133
134  // throttler
135  private ReplicationThrottler throttler;
136  private long defaultBandwidth;
137  private long currentBandwidth;
138  private WALFileLengthProvider walFileLengthProvider;
139  protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
140    new ConcurrentHashMap<>();
141
142  public static final String WAIT_ON_ENDPOINT_SECONDS =
143    "hbase.replication.wait.on.endpoint.seconds";
144  public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
145  private int waitOnEndpointSeconds = -1;
146
147  private Thread initThread;
148
149  /**
150   * WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to
151   * skip.
152   */
153  private final Predicate<Path> filterInWALs;
154
155  /**
156   * Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we
157   * do not want replicated, passed on to replication endpoints. This is the basic set. Down in
158   * #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are
159   * put after those that we pick up from the configured endpoints and other machinations to create
160   * the final {@link #walEntryFilter}.
161   * @see WALEntryFilter
162   */
163  private final List<WALEntryFilter> baseFilterOutWALEntries;
164
165  ReplicationSource() {
166    // Default, filters *in* all WALs but meta WALs & filters *out* all WALEntries of System Tables.
167    this(p -> !AbstractFSWALProvider.isMetaFile(p),
168      Lists.newArrayList(new SystemTableWALEntryFilter()));
169  }
170
171  /**
172   * @param replicateWAL            Pass a filter to run against WAL Path; filter *in* WALs to
173   *                                Replicate; i.e. return 'true' if you want to replicate the
174   *                                content of the WAL.
175   * @param baseFilterOutWALEntries Base set of filters you want applied always; filters *out*
176   *                                WALEntries so they never make it out of this ReplicationSource.
177   */
178  ReplicationSource(Predicate<Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) {
179    this.filterInWALs = replicateWAL;
180    this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
181  }
182
183  /**
184   * Instantiation method used by region servers
185   * @param conf      configuration to use
186   * @param fs        file system to use
187   * @param manager   replication manager to ping to
188   * @param server    the server for this region server
189   * @param queueId   the id of our replication queue
190   * @param clusterId unique UUID for the cluster
191   * @param metrics   metrics for replication source
192   */
193  @Override
194  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
195    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
196    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
197    MetricsSource metrics) throws IOException {
198    this.server = server;
199    this.conf = HBaseConfiguration.create(conf);
200    this.waitOnEndpointSeconds =
201      this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
202    decorateConf();
203    // 1 second
204    this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
205    // 5 minutes @ 1 sec per
206    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
207    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
208    this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
209    this.queueStorage = queueStorage;
210    this.replicationPeer = replicationPeer;
211    this.manager = manager;
212    this.fs = fs;
213    this.metrics = metrics;
214    this.clusterId = clusterId;
215
216    this.queueId = queueId;
217    this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
218    // ReplicationQueueInfo parses the peerId out of the znode for us
219    this.peerId = this.replicationQueueInfo.getPeerId();
220
221    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
222    defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
223    currentBandwidth = getCurrentBandwidth();
224    this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
225    this.walFileLengthProvider = walFileLengthProvider;
226
227    this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true);
228
229    LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
230      replicationPeer.getId(), this.currentBandwidth);
231  }
232
233  private void decorateConf() {
234    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
235    if (StringUtils.isNotEmpty(replicationCodec)) {
236      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
237    }
238  }
239
240  @Override
241  public void enqueueLog(Path wal) {
242    if (!this.filterInWALs.test(wal)) {
243      LOG.trace("NOT replicating {}", wal);
244      return;
245    }
246    // Use WAL prefix as the WALGroupId for this peer.
247    String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
248    boolean queueExists = logQueue.enqueueLog(wal, walPrefix);
249
250    if (!queueExists) {
251      if (this.isSourceActive() && this.walEntryFilter != null) {
252        // new wal group observed after source startup, start a new worker thread to track it
253        // notice: it's possible that wal enqueued when this.running is set but worker thread
254        // still not launched, so it's necessary to check workerThreads before start the worker
255        tryStartNewShipper(walPrefix);
256      }
257    }
258    if (LOG.isTraceEnabled()) {
259      LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
260        this.replicationQueueInfo.getQueueId());
261    }
262  }
263
264  @InterfaceAudience.Private
265  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
266    return logQueue.getQueues();
267  }
268
269  @Override
270  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
271    throws ReplicationException {
272    String peerId = replicationPeer.getId();
273    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
274      this.queueStorage.addHFileRefs(peerId, pairs);
275      metrics.incrSizeOfHFileRefsQueue(pairs.size());
276    } else {
277      LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
278        tableName, Bytes.toString(family), peerId);
279    }
280  }
281
282  private ReplicationEndpoint createReplicationEndpoint()
283    throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
284    RegionServerCoprocessorHost rsServerHost = null;
285    if (server instanceof HRegionServer) {
286      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
287    }
288    String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
289
290    ReplicationEndpoint replicationEndpoint;
291    if (replicationEndpointImpl == null) {
292      // Default to HBase inter-cluster replication endpoint; skip reflection
293      replicationEndpoint = new HBaseInterClusterReplicationEndpoint();
294    } else {
295      try {
296        replicationEndpoint = Class.forName(replicationEndpointImpl)
297          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
298      } catch (NoSuchMethodException | InvocationTargetException e) {
299        throw new IllegalArgumentException(e);
300      }
301    }
302    if (rsServerHost != null) {
303      ReplicationEndpoint newReplicationEndPoint =
304        rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
305      if (newReplicationEndPoint != null) {
306        // Override the newly created endpoint from the hook with configured end point
307        replicationEndpoint = newReplicationEndPoint;
308      }
309    }
310    return replicationEndpoint;
311  }
312
313  private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
314    throws IOException, TimeoutException {
315    TableDescriptors tableDescriptors = null;
316    if (server instanceof HRegionServer) {
317      tableDescriptors = ((HRegionServer) server).getTableDescriptors();
318    }
319    replicationEndpoint
320      .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
321        clusterId, replicationPeer, metrics, tableDescriptors, server));
322    replicationEndpoint.start();
323    replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
324  }
325
326  private void initializeWALEntryFilter(UUID peerClusterId) {
327    // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
328    List<WALEntryFilter> filters = new ArrayList<>(this.baseFilterOutWALEntries);
329    WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
330    if (filterFromEndpoint != null) {
331      filters.add(filterFromEndpoint);
332    }
333    filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
334    this.walEntryFilter = new ChainWALEntryFilter(filters);
335  }
336
337  private void tryStartNewShipper(String walGroupId) {
338    workerThreads.compute(walGroupId, (key, value) -> {
339      if (value != null) {
340        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
341        return value;
342      } else {
343        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
344        ReplicationSourceShipper worker = createNewShipper(walGroupId);
345        ReplicationSourceWALReader walReader =
346          createNewWALReader(walGroupId, worker.getStartPosition());
347        Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
348          + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
349        worker.setWALReader(walReader);
350        worker.startup(this::retryRefreshing);
351        return worker;
352      }
353    });
354  }
355
356  @Override
357  public Map<String, ReplicationStatus> getWalGroupStatus() {
358    Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
359    long ageOfLastShippedOp, replicationDelay, fileSize;
360    for (Map.Entry<String, ReplicationSourceShipper> walGroupShipper : workerThreads.entrySet()) {
361      String walGroupId = walGroupShipper.getKey();
362      ReplicationSourceShipper shipper = walGroupShipper.getValue();
363      ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
364      int queueSize = logQueue.getQueueSize(walGroupId);
365      replicationDelay = metrics.getReplicationDelay();
366      Path currentPath = shipper.getCurrentPath();
367      fileSize = -1;
368      if (currentPath != null) {
369        try {
370          fileSize = getFileSize(currentPath);
371        } catch (IOException e) {
372          LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
373        }
374      } else {
375        currentPath = new Path("NO_LOGS_IN_QUEUE");
376        LOG.warn("{} No replication ongoing, waiting for new log", logPeerId());
377      }
378      ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
379      statusBuilder.withPeerId(this.getPeerId()).withQueueSize(queueSize).withWalGroup(walGroupId)
380        .withCurrentPath(currentPath).withCurrentPosition(shipper.getCurrentPosition())
381        .withFileSize(fileSize).withAgeOfLastShippedOp(ageOfLastShippedOp)
382        .withReplicationDelay(replicationDelay);
383      sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
384    }
385    return sourceReplicationStatus;
386  }
387
388  private long getFileSize(Path currentPath) throws IOException {
389    long fileSize;
390    try {
391      fileSize = fs.getContentSummary(currentPath).getLength();
392    } catch (FileNotFoundException e) {
393      Path archivedLogPath = findArchivedLog(currentPath, conf);
394      // archivedLogPath can be null if unable to locate in archiveDir.
395      if (archivedLogPath == null) {
396        throw new FileNotFoundException("Couldn't find path: " + currentPath);
397      }
398      fileSize = fs.getContentSummary(archivedLogPath).getLength();
399    }
400    return fileSize;
401  }
402
403  protected ReplicationSourceShipper createNewShipper(String walGroupId) {
404    return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
405  }
406
407  private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
408    return replicationPeer.getPeerConfig().isSerial()
409      ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
410        this, walGroupId)
411      : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter, this,
412        walGroupId);
413  }
414
415  /**
416   * Call after {@link #initializeWALEntryFilter(UUID)} else it will be null.
417   * @return WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
418   */
419  WALEntryFilter getWalEntryFilter() {
420    return walEntryFilter;
421  }
422
423  // log the error, check if the error is OOME, or whether we should abort the server
424  private void checkError(Thread t, Throwable error) {
425    RSRpcServices.exitIfOOME(error);
426    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error);
427    if (abortOnError) {
428      server.abort("Unexpected exception in " + t.getName(), error);
429    }
430  }
431
432  private void retryRefreshing(Thread t, Throwable error) {
433    checkError(t, error);
434    while (true) {
435      if (server.isAborted() || server.isStopped() || server.isStopping()) {
436        LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId());
437        return;
438      }
439      try {
440        LOG.info("Refreshing replication sources now due to previous error on thread: {}",
441          t.getName());
442        manager.refreshSources(getPeerId());
443        break;
444      } catch (Exception e) {
445        LOG.error("Replication sources refresh failed.", e);
446        sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
447      }
448    }
449  }
450
451  @Override
452  public ReplicationEndpoint getReplicationEndpoint() {
453    return this.replicationEndpoint;
454  }
455
456  @Override
457  public ReplicationSourceManager getSourceManager() {
458    return this.manager;
459  }
460
461  @Override
462  public void tryThrottle(int batchSize) throws InterruptedException {
463    checkBandwidthChangeAndResetThrottler();
464    if (throttler.isEnabled()) {
465      long sleepTicks = throttler.getNextSleepInterval(batchSize);
466      if (sleepTicks > 0) {
467        if (LOG.isTraceEnabled()) {
468          LOG.trace("{} To sleep {}ms for throttling control", logPeerId(), sleepTicks);
469        }
470        Thread.sleep(sleepTicks);
471        // reset throttler's cycle start tick when sleep for throttling occurs
472        throttler.resetStartTick();
473      }
474    }
475  }
476
477  private void checkBandwidthChangeAndResetThrottler() {
478    long peerBandwidth = getCurrentBandwidth();
479    if (peerBandwidth != currentBandwidth) {
480      currentBandwidth = peerBandwidth;
481      throttler.setBandwidth((double) currentBandwidth / 10.0);
482      LOG.info("ReplicationSource : " + peerId + " bandwidth throttling changed, currentBandWidth="
483        + currentBandwidth);
484    }
485  }
486
487  private long getCurrentBandwidth() {
488    long peerBandwidth = replicationPeer.getPeerBandwidth();
489    // User can set peer bandwidth to 0 to use default bandwidth.
490    return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
491  }
492
493  /**
494   * Do the sleeping logic
495   * @param msg             Why we sleep
496   * @param sleepMultiplier by how many times the default sleeping time is augmented
497   * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
498   */
499  protected boolean sleepForRetries(String msg, int sleepMultiplier) {
500    try {
501      if (LOG.isTraceEnabled()) {
502        LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
503          sleepMultiplier);
504      }
505      Thread.sleep(this.sleepForRetries * sleepMultiplier);
506    } catch (InterruptedException e) {
507      if (LOG.isDebugEnabled()) {
508        LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
509      }
510      Thread.currentThread().interrupt();
511    }
512    return sleepMultiplier < maxRetriesMultiplier;
513  }
514
515  /**
516   * check whether the peer is enabled or not
517   * @return true if the peer is enabled, otherwise false
518   */
519  @Override
520  public boolean isPeerEnabled() {
521    return replicationPeer.isPeerEnabled();
522  }
523
524  private void initialize() {
525    int sleepMultiplier = 1;
526    while (this.isSourceActive()) {
527      ReplicationEndpoint replicationEndpoint;
528      try {
529        replicationEndpoint = createReplicationEndpoint();
530      } catch (Exception e) {
531        LOG.warn("{} error creating ReplicationEndpoint, retry", logPeerId(), e);
532        if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
533          sleepMultiplier++;
534        }
535        continue;
536      }
537
538      try {
539        initAndStartReplicationEndpoint(replicationEndpoint);
540        this.replicationEndpoint = replicationEndpoint;
541        break;
542      } catch (Exception e) {
543        LOG.warn("{} Error starting ReplicationEndpoint, retry", logPeerId(), e);
544        replicationEndpoint.stop();
545        if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
546          sleepMultiplier++;
547        } else {
548          retryStartup.set(!this.abortOnError);
549          setSourceStartupStatus(false);
550          throw new RuntimeException("Exhausted retries to start replication endpoint.");
551        }
552      }
553    }
554
555    if (!this.isSourceActive()) {
556      // this means the server is shutting down or the source is terminated, just give up
557      // initializing
558      setSourceStartupStatus(false);
559      return;
560    }
561
562    sleepMultiplier = 1;
563    UUID peerClusterId;
564    // delay this until we are in an asynchronous thread
565    for (;;) {
566      peerClusterId = replicationEndpoint.getPeerUUID();
567      if (this.isSourceActive() && peerClusterId == null) {
568        if (LOG.isDebugEnabled()) {
569          LOG.debug("{} Could not connect to Peer ZK. Sleeping for {} millis", logPeerId(),
570            (this.sleepForRetries * sleepMultiplier));
571        }
572        if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
573          sleepMultiplier++;
574        }
575      } else {
576        break;
577      }
578    }
579
580    if (!this.isSourceActive()) {
581      // this means the server is shutting down or the source is terminated, just give up
582      // initializing
583      setSourceStartupStatus(false);
584      return;
585    }
586
587    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}", logPeerId(),
588      this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId, peerClusterId);
589    initializeWALEntryFilter(peerClusterId);
590    // Start workers
591    for (String walGroupId : logQueue.getQueues().keySet()) {
592      tryStartNewShipper(walGroupId);
593    }
594    setSourceStartupStatus(false);
595  }
596
597  private synchronized void setSourceStartupStatus(boolean initializing) {
598    startupOngoing.set(initializing);
599    if (initializing) {
600      metrics.incrSourceInitializing();
601    } else {
602      metrics.decrSourceInitializing();
603    }
604  }
605
606  @Override
607  public ReplicationSourceInterface startup() {
608    if (this.sourceRunning) {
609      return this;
610    }
611    this.sourceRunning = true;
612    setSourceStartupStatus(true);
613    initThread = new Thread(this::initialize);
614    Threads.setDaemonThreadRunning(initThread,
615      Thread.currentThread().getName() + ".replicationSource," + this.queueId, (t, e) -> {
616        // if first initialization attempt failed, and abortOnError is false, we will
617        // keep looping in this thread until initialize eventually succeeds,
618        // while the server main startup one can go on with its work.
619        sourceRunning = false;
620        checkError(t, e);
621        retryStartup.set(!this.abortOnError);
622        do {
623          if (retryStartup.get()) {
624            this.sourceRunning = true;
625            setSourceStartupStatus(true);
626            retryStartup.set(false);
627            try {
628              initialize();
629            } catch (Throwable error) {
630              setSourceStartupStatus(false);
631              checkError(t, error);
632              retryStartup.set(!this.abortOnError);
633            }
634          }
635        } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
636      });
637    return this;
638  }
639
640  @Override
641  public void terminate(String reason) {
642    terminate(reason, null);
643  }
644
645  @Override
646  public void terminate(String reason, Exception cause) {
647    terminate(reason, cause, true);
648  }
649
650  @Override
651  public void terminate(String reason, Exception cause, boolean clearMetrics) {
652    terminate(reason, cause, clearMetrics, true);
653  }
654
655  public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
656    if (cause == null) {
657      LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
658    } else {
659      LOG.error(String.format("%s Closing source %s because an error occurred: %s", logPeerId(),
660        this.queueId, reason), cause);
661    }
662    this.sourceRunning = false;
663    if (initThread != null && Thread.currentThread() != initThread) {
664      // This usually won't happen but anyway, let's wait until the initialization thread exits.
665      // And notice that we may call terminate directly from the initThread so here we need to
666      // avoid join on ourselves.
667      initThread.interrupt();
668      Threads.shutdown(initThread, this.sleepForRetries);
669    }
670    Collection<ReplicationSourceShipper> workers = workerThreads.values();
671
672    for (ReplicationSourceShipper worker : workers) {
673      worker.stopWorker();
674      if (worker.entryReader != null) {
675        worker.entryReader.setReaderRunning(false);
676      }
677    }
678
679    if (this.replicationEndpoint != null) {
680      this.replicationEndpoint.stop();
681    }
682
683    for (ReplicationSourceShipper worker : workers) {
684      if (worker.isAlive() || worker.entryReader.isAlive()) {
685        try {
686          // Wait worker to stop
687          Thread.sleep(this.sleepForRetries);
688        } catch (InterruptedException e) {
689          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
690          Thread.currentThread().interrupt();
691        }
692        // If worker still is alive after waiting, interrupt it
693        if (worker.isAlive()) {
694          worker.interrupt();
695        }
696        // If entry reader is alive after waiting, interrupt it
697        if (worker.entryReader.isAlive()) {
698          worker.entryReader.interrupt();
699        }
700      }
701      if (!server.isAborted() && !server.isStopped()) {
702        // If server is running and worker is already stopped but there was still entries batched,
703        // we need to clear buffer used for non processed entries
704        worker.clearWALEntryBatch();
705      }
706    }
707
708    if (join) {
709      for (ReplicationSourceShipper worker : workers) {
710        Threads.shutdown(worker, this.sleepForRetries);
711        LOG.info("{} ReplicationSourceWorker {} terminated", logPeerId(), worker.getName());
712      }
713      if (this.replicationEndpoint != null) {
714        try {
715          this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
716            TimeUnit.MILLISECONDS);
717        } catch (TimeoutException te) {
718          LOG.warn("{} Got exception while waiting for endpoint to shutdown "
719            + "for replication source : {}", logPeerId(), this.queueId, te);
720        }
721      }
722    }
723
724    // Can be null in test context.
725    if (this.metrics != null) {
726      if (clearMetrics) {
727        this.metrics.clear();
728      } else {
729        this.metrics.terminate();
730      }
731    }
732  }
733
734  @Override
735  public String getQueueId() {
736    return this.queueId;
737  }
738
739  @Override
740  public String getPeerId() {
741    return this.peerId;
742  }
743
744  @Override
745  public Path getCurrentPath() {
746    // only for testing
747    for (ReplicationSourceShipper worker : workerThreads.values()) {
748      if (worker.getCurrentPath() != null) {
749        return worker.getCurrentPath();
750      }
751    }
752    return null;
753  }
754
755  @Override
756  public boolean isSourceActive() {
757    return !this.server.isStopped() && this.sourceRunning;
758  }
759
760  public ReplicationQueueInfo getReplicationQueueInfo() {
761    return replicationQueueInfo;
762  }
763
764  public boolean isWorkerRunning() {
765    for (ReplicationSourceShipper worker : this.workerThreads.values()) {
766      if (worker.isActive()) {
767        return worker.isActive();
768      }
769    }
770    return false;
771  }
772
773  @Override
774  public String getStats() {
775    StringBuilder sb = new StringBuilder();
776    sb.append("Total replicated edits: ").append(totalReplicatedEdits)
777      .append(", current progress: \n");
778    for (Map.Entry<String, ReplicationSourceShipper> entry : workerThreads.entrySet()) {
779      String walGroupId = entry.getKey();
780      ReplicationSourceShipper worker = entry.getValue();
781      long position = worker.getCurrentPosition();
782      Path currentPath = worker.getCurrentPath();
783      sb.append("walGroup [").append(walGroupId).append("]: ");
784      if (currentPath != null) {
785        sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
786          .append(position).append("\n");
787      } else {
788        sb.append("no replication ongoing, waiting for new log").append("\n");
789      }
790    }
791    return sb.toString();
792  }
793
794  @Override
795  public MetricsSource getSourceMetrics() {
796    return this.metrics;
797  }
798
799  @Override
800  // offsets totalBufferUsed by deducting shipped batchSize.
801  public void postShipEdits(List<Entry> entries, long batchSize) {
802    if (throttler.isEnabled()) {
803      throttler.addPushSize(batchSize);
804    }
805    totalReplicatedEdits.addAndGet(entries.size());
806    this.manager.releaseBufferQuota(batchSize);
807  }
808
809  @Override
810  public WALFileLengthProvider getWALFileLengthProvider() {
811    return walFileLengthProvider;
812  }
813
814  @Override
815  public ServerName getServerWALsBelongTo() {
816    return server.getServerName();
817  }
818
819  Server getServer() {
820    return server;
821  }
822
823  @Override
824  public ReplicationQueueStorage getReplicationQueueStorage() {
825    return queueStorage;
826  }
827
828  /** Returns String to use as a log prefix that contains current peerId. */
829  public String logPeerId() {
830    return "peerId=" + this.getPeerId() + ",";
831  }
832
833  // Visible for testing purpose
834  public long getTotalReplicatedEdits() {
835    return totalReplicatedEdits.get();
836  }
837}