001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.HashMap;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.NavigableSet;
028import java.util.OptionalLong;
029import java.util.Set;
030import java.util.SortedSet;
031import java.util.TreeSet;
032import java.util.UUID;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.LinkedBlockingQueue;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.ThreadPoolExecutor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicLong;
040import java.util.concurrent.atomic.AtomicReference;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Server;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
050import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
051import org.apache.hadoop.hbase.replication.ReplicationException;
052import org.apache.hadoop.hbase.replication.ReplicationPeer;
053import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
054import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
055import org.apache.hadoop.hbase.replication.ReplicationPeers;
056import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
057import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.apache.hadoop.hbase.wal.WAL.Entry;
062import org.apache.hadoop.hbase.wal.WALFactory;
063import org.apache.hadoop.hbase.wal.WALProvider;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.apache.zookeeper.KeeperException;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
070import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
071
072/**
073 * This class is responsible to manage all the replication sources. There are two classes of
074 * sources:
075 * <ul>
076 * <li>Normal sources are persistent and one per peer cluster</li>
077 * <li>Old sources are recovered from a failed region server and our only goal is to finish
078 * replicating the WAL queue it had</li>
079 * </ul>
080 * <p>
081 * When a region server dies, this class uses a watcher to get notified and it tries to grab a lock
082 * in order to transfer all the queues in a local old source.
083 * <p>
084 * Synchronization specification:
085 * <ul>
086 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
087 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
088 * operations.</li>
089 * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
090 * {@link #addPeer(String)}, {@link #removePeer(String)},
091 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
092 * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
093 * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
094 * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
095 * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
096 * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
097 * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
098 * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
099 * {@link #preLogRoll(Path)}.</li>
100 * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
101 * modify it, {@link #removePeer(String)},
102 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
103 * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
104 * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
105 * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
106 * {@link ReplicationSourceInterface} firstly, then remove the wals from
107 * {@link #walsByIdRecoveredQueues}. And
108 * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to
109 * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
110 * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and
111 * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
112 * synchronized on {@link #walsByIdRecoveredQueues}.</li>
113 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
114 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
115 * to-be-removed peer.</li>
116 * </ul>
117 */
118@InterfaceAudience.Private
119public class ReplicationSourceManager {
120  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
121  // all the sources that read this RS's logs and every peer only has one replication source
122  private final ConcurrentMap<String, ReplicationSourceInterface> sources;
123  // List of all the sources we got from died RSs
124  private final List<ReplicationSourceInterface> oldsources;
125
126  /**
127   * Storage for queues that need persistance; e.g. Replication state so can be recovered after a
128   * crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances
129   * for these to do updates themselves. Not all ReplicationSource instances keep state.
130   */
131  private final ReplicationQueueStorage queueStorage;
132
133  private final ReplicationPeers replicationPeers;
134  // UUID for this cluster
135  private final UUID clusterId;
136  // All about stopping
137  private final Server server;
138
139  // All logs we are currently tracking
140  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
141  // For normal replication source, the peer id is same with the queue id
142  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
143  // Logs for recovered sources we are currently tracking
144  // the map is: queue_id->logPrefix/logGroup->logs
145  // For recovered source, the queue id's format is peer_id-servername-*
146  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
147
148  private final Configuration conf;
149  private final FileSystem fs;
150  // The paths to the latest log of each wal group, for new coming peers
151  private final Map<String, Path> latestPaths;
152  // Path to the wals directories
153  private final Path logDir;
154  // Path to the wal archive
155  private final Path oldLogDir;
156  private final WALFactory walFactory;
157  // The number of ms that we wait before moving znodes, HBASE-3596
158  private final long sleepBeforeFailover;
159  // Homemade executer service for replication
160  private final ThreadPoolExecutor executor;
161
162  private final boolean replicationForBulkLoadDataEnabled;
163
164  private AtomicLong totalBufferUsed = new AtomicLong();
165  // Total buffer size on this RegionServer for holding batched edits to be shipped.
166  private final long totalBufferLimit;
167  private final MetricsReplicationGlobalSourceSource globalMetrics;
168
169  /**
170   * A special ReplicationSource for hbase:meta Region Read Replicas. Usually this reference remains
171   * empty. If an hbase:meta Region is opened on this server, we will create an instance of a
172   * hbase:meta CatalogReplicationSource and it will live the life of the Server thereafter; i.e. we
173   * will not shut it down even if the hbase:meta moves away from this server (in case it later gets
174   * moved back). We synchronize on this instance testing for presence and if absent, while creating
175   * so only created and started once.
176   */
177  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
178
179  /**
180   * Creates a replication manager and sets the watch on all the other registered region servers
181   * @param queueStorage the interface for manipulating replication queues
182   * @param conf         the configuration to use
183   * @param server       the server for this region server
184   * @param fs           the file system to use
185   * @param logDir       the directory that contains all wal directories of live RSs
186   * @param oldLogDir    the directory where old logs are archived
187   */
188  public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
189    ReplicationPeers replicationPeers, Configuration conf, Server server, FileSystem fs,
190    Path logDir, Path oldLogDir, UUID clusterId, WALFactory walFactory,
191    MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
192    // CopyOnWriteArrayList is thread-safe.
193    // Generally, reading is more than modifying.
194    this.sources = new ConcurrentHashMap<>();
195    this.queueStorage = queueStorage;
196    this.replicationPeers = replicationPeers;
197    this.server = server;
198    this.walsById = new ConcurrentHashMap<>();
199    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
200    this.oldsources = new ArrayList<>();
201    this.conf = conf;
202    this.fs = fs;
203    this.logDir = logDir;
204    this.oldLogDir = oldLogDir;
205    this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
206                                                                                         // seconds
207    this.clusterId = clusterId;
208    this.walFactory = walFactory;
209    // It's preferable to failover 1 RS at a time, but with good zk servers
210    // more could be processed at the same time.
211    int nbWorkers = conf.getInt("replication.executor.workers", 1);
212    // use a short 100ms sleep since this could be done inline with a RS startup
213    // even if we fail, other region servers can take care of it
214    this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
215      new LinkedBlockingQueue<>());
216    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
217    tfb.setNameFormat("ReplicationExecutor-%d");
218    tfb.setDaemon(true);
219    this.executor.setThreadFactory(tfb.build());
220    this.latestPaths = new HashMap<>();
221    replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
222      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
223    this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
224      HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
225    this.globalMetrics = globalMetrics;
226  }
227
228  /**
229   * Adds a normal source per registered peer cluster.
230   */
231  void init() throws IOException {
232    for (String id : this.replicationPeers.getAllPeerIds()) {
233      addSource(id);
234      if (replicationForBulkLoadDataEnabled) {
235        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
236        // when a peer was added before replication for bulk loaded data was enabled.
237        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(id));
238      }
239    }
240  }
241
242  /**
243   * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
244   * HFile Refs
245   * @param peerId the id of replication peer
246   */
247  public void addPeer(String peerId) throws IOException {
248    boolean added = false;
249    try {
250      added = this.replicationPeers.addPeer(peerId);
251    } catch (ReplicationException e) {
252      throw new IOException(e);
253    }
254    if (added) {
255      addSource(peerId);
256      if (replicationForBulkLoadDataEnabled) {
257        throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
258      }
259    }
260  }
261
262  /**
263   * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
264   * and related replication queues 3. Remove the normal source and related replication queue 4.
265   * Remove HFile Refs
266   * @param peerId the id of the replication peer
267   */
268  public void removePeer(String peerId) {
269    replicationPeers.removePeer(peerId);
270    String terminateMessage = "Replication stream was removed by a user";
271    List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
272    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
273    // see NodeFailoverWorker.run
274    synchronized (this.oldsources) {
275      // First close all the recovered sources for this peer
276      for (ReplicationSourceInterface src : oldsources) {
277        if (peerId.equals(src.getPeerId())) {
278          oldSourcesToDelete.add(src);
279        }
280      }
281      for (ReplicationSourceInterface src : oldSourcesToDelete) {
282        src.terminate(terminateMessage);
283        removeRecoveredSource(src);
284      }
285    }
286    LOG
287      .info("Number of deleted recovered sources for " + peerId + ": " + oldSourcesToDelete.size());
288    // Now close the normal source for this peer
289    ReplicationSourceInterface srcToRemove = this.sources.get(peerId);
290    if (srcToRemove != null) {
291      srcToRemove.terminate(terminateMessage);
292      removeSource(srcToRemove);
293    } else {
294      // This only happened in unit test TestReplicationSourceManager#testPeerRemovalCleanup
295      // Delete queue from storage and memory and queue id is same with peer id for normal
296      // source
297      deleteQueue(peerId);
298      this.walsById.remove(peerId);
299    }
300
301    // Remove HFile Refs
302    abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
303  }
304
305  /**
306   * @return a new 'classic' user-space replication source.
307   * @param queueId the id of the replication queue to associate the ReplicationSource with.
308   * @see #createCatalogReplicationSource(RegionInfo) for creating a ReplicationSource for meta.
309   */
310  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
311    throws IOException {
312    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
313    // Init the just created replication source. Pass the default walProvider's wal file length
314    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
315    // replication, see #createCatalogReplicationSource().
316    WALFileLengthProvider walFileLengthProvider = this.walFactory.getWALProvider() != null
317      ? this.walFactory.getWALProvider().getWALFileLengthProvider()
318      : p -> OptionalLong.empty();
319    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
320      walFileLengthProvider, new MetricsSource(queueId));
321    return src;
322  }
323
324  /**
325   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
326   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
327   * group and do replication
328   * @param peerId the id of the replication peer
329   * @return the source that was created
330   */
331  ReplicationSourceInterface addSource(String peerId) throws IOException {
332    ReplicationPeer peer = replicationPeers.getPeer(peerId);
333    ReplicationSourceInterface src = createSource(peerId, peer);
334    // synchronized on latestPaths to avoid missing the new log
335    synchronized (this.latestPaths) {
336      this.sources.put(peerId, src);
337      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
338      this.walsById.put(peerId, walsByGroup);
339      // Add the latest wal to that source's queue
340      if (!latestPaths.isEmpty()) {
341        for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
342          Path walPath = walPrefixAndPath.getValue();
343          NavigableSet<String> wals = new TreeSet<>();
344          wals.add(walPath.getName());
345          walsByGroup.put(walPrefixAndPath.getKey(), wals);
346          // Abort RS and throw exception to make add peer failed
347          abortAndThrowIOExceptionWhenFail(
348            () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
349          src.enqueueLog(walPath);
350          LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
351        }
352      }
353    }
354    src.startup();
355    return src;
356  }
357
358  /**
359   * Close the previous replication sources of this peer id and open new sources to trigger the new
360   * replication state changes or new replication config changes. Here we don't need to change
361   * replication queue storage and only to enqueue all logs to the new replication source
362   * @param peerId the id of the replication peer
363   */
364  public void refreshSources(String peerId) throws IOException {
365    String terminateMessage = "Peer " + peerId
366      + " state or config changed. Will close the previous replication source and open a new one";
367    ReplicationPeer peer = replicationPeers.getPeer(peerId);
368    ReplicationSourceInterface src;
369    // synchronized on latestPaths to avoid missing the new log
370    synchronized (this.latestPaths) {
371      ReplicationSourceInterface toRemove = this.sources.remove(peerId);
372      if (toRemove != null) {
373        LOG.info("Terminate replication source for " + toRemove.getPeerId());
374        // Do not clear metrics
375        toRemove.terminate(terminateMessage, null, false);
376      }
377      src = createSource(peerId, peer);
378      this.sources.put(peerId, src);
379      for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
380        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
381      }
382    }
383    LOG.info("Startup replication source for " + src.getPeerId());
384    src.startup();
385
386    List<ReplicationSourceInterface> toStartup = new ArrayList<>();
387    // synchronized on oldsources to avoid race with NodeFailoverWorker
388    synchronized (this.oldsources) {
389      List<String> previousQueueIds = new ArrayList<>();
390      for (Iterator<ReplicationSourceInterface> iter = this.oldsources.iterator(); iter
391        .hasNext();) {
392        ReplicationSourceInterface oldSource = iter.next();
393        if (oldSource.getPeerId().equals(peerId)) {
394          previousQueueIds.add(oldSource.getQueueId());
395          oldSource.terminate(terminateMessage);
396          iter.remove();
397        }
398      }
399      for (String queueId : previousQueueIds) {
400        ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
401        this.oldsources.add(recoveredReplicationSource);
402        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
403          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
404        }
405        toStartup.add(recoveredReplicationSource);
406      }
407    }
408    for (ReplicationSourceInterface replicationSource : toStartup) {
409      replicationSource.startup();
410    }
411  }
412
413  /**
414   * Clear the metrics and related replication queue of the specified old source
415   * @param src source to clear
416   */
417  void removeRecoveredSource(ReplicationSourceInterface src) {
418    LOG.info("Done with the recovered queue " + src.getQueueId());
419    this.oldsources.remove(src);
420    // Delete queue from storage and memory
421    deleteQueue(src.getQueueId());
422    this.walsByIdRecoveredQueues.remove(src.getQueueId());
423  }
424
425  /**
426   * Clear the metrics and related replication queue of the specified old source
427   * @param src source to clear
428   */
429  void removeSource(ReplicationSourceInterface src) {
430    LOG.info("Done with the queue " + src.getQueueId());
431    this.sources.remove(src.getPeerId());
432    // Delete queue from storage and memory
433    deleteQueue(src.getQueueId());
434    this.walsById.remove(src.getQueueId());
435  }
436
437  /**
438   * Delete a complete queue of wals associated with a replication source
439   * @param queueId the id of replication queue to delete
440   */
441  private void deleteQueue(String queueId) {
442    abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), queueId));
443  }
444
445  @FunctionalInterface
446  private interface ReplicationQueueOperation {
447    void exec() throws ReplicationException;
448  }
449
450  /**
451   * Refresh replication source will terminate the old source first, then the source thread will be
452   * interrupted. Need to handle it instead of abort the region server.
453   */
454  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
455    try {
456      op.exec();
457    } catch (ReplicationException e) {
458      if (
459        e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
460          && e.getCause().getCause() != null
461          && e.getCause().getCause() instanceof InterruptedException
462      ) {
463        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
464        // that thread is interrupted deep down in the stack, it should pass the following
465        // processing logic and propagate to the most top layer which can handle this exception
466        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
467        throw new ReplicationRuntimeException(
468          "Thread is interrupted, the replication source may be terminated",
469          e.getCause().getCause());
470      }
471      server.abort("Failed to operate on replication queue", e);
472    }
473  }
474
475  private void abortWhenFail(ReplicationQueueOperation op) {
476    try {
477      op.exec();
478    } catch (ReplicationException e) {
479      server.abort("Failed to operate on replication queue", e);
480    }
481  }
482
483  private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
484    try {
485      op.exec();
486    } catch (ReplicationException e) {
487      throw new IOException(e);
488    }
489  }
490
491  private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
492    try {
493      op.exec();
494    } catch (ReplicationException e) {
495      server.abort("Failed to operate on replication queue", e);
496      throw new IOException(e);
497    }
498  }
499
500  /**
501   * This method will log the current position to storage. And also clean old logs from the
502   * replication queue.
503   * @param entryBatch the wal entry batch we just shipped
504   */
505  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
506    WALEntryBatch entryBatch) {
507    String fileName = entryBatch.getLastWalPath().getName();
508    String queueId = source.getQueueId();
509    interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId,
510      fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
511    cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, source.isRecovered());
512  }
513
514  /**
515   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
516   * file is closed and has no more entries.
517   * @param log            Path to the log
518   * @param inclusive      whether we should also remove the given log file
519   * @param queueId        id of the replication queue
520   * @param queueRecovered Whether this is a recovered queue
521   */
522  void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
523    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
524    if (queueRecovered) {
525      NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
526      if (wals != null) {
527        cleanOldLogs(wals, log, inclusive, queueId);
528      }
529    } else {
530      // synchronized on walsById to avoid race with preLogRoll
531      synchronized (this.walsById) {
532        NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
533        if (wals != null) {
534          cleanOldLogs(wals, log, inclusive, queueId);
535        }
536      }
537    }
538  }
539
540  private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
541    NavigableSet<String> walSet = wals.headSet(key, inclusive);
542    if (walSet.isEmpty()) {
543      return;
544    }
545    LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
546    for (String wal : walSet) {
547      interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
548    }
549    walSet.clear();
550  }
551
552  // public because of we call it in TestReplicationEmptyWALRecovery
553  public void preLogRoll(Path newLog) throws IOException {
554    String logName = newLog.getName();
555    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
556    // synchronized on latestPaths to avoid the new open source miss the new log
557    synchronized (this.latestPaths) {
558      // Add log to queue storage
559      for (ReplicationSourceInterface source : this.sources.values()) {
560        // If record log to queue storage failed, abort RS and throw exception to make log roll
561        // failed
562        abortAndThrowIOExceptionWhenFail(
563          () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
564      }
565
566      // synchronized on walsById to avoid race with cleanOldLogs
567      synchronized (this.walsById) {
568        // Update walsById map
569        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
570          .entrySet()) {
571          String peerId = entry.getKey();
572          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
573          boolean existingPrefix = false;
574          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
575            SortedSet<String> wals = walsEntry.getValue();
576            if (this.sources.isEmpty()) {
577              // If there's no slaves, don't need to keep the old wals since
578              // we only consider the last one when a new slave comes in
579              wals.clear();
580            }
581            if (logPrefix.equals(walsEntry.getKey())) {
582              wals.add(logName);
583              existingPrefix = true;
584            }
585          }
586          if (!existingPrefix) {
587            // The new log belongs to a new group, add it into this peer
588            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
589            NavigableSet<String> wals = new TreeSet<>();
590            wals.add(logName);
591            walsByPrefix.put(logPrefix, wals);
592          }
593        }
594      }
595
596      // Add to latestPaths
597      latestPaths.put(logPrefix, newLog);
598    }
599  }
600
601  // public because of we call it in TestReplicationEmptyWALRecovery
602  public void postLogRoll(Path newLog) throws IOException {
603    // This only updates the sources we own, not the recovered ones
604    for (ReplicationSourceInterface source : this.sources.values()) {
605      source.enqueueLog(newLog);
606      LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", newLog,
607        source.getQueueId());
608    }
609  }
610
611  void claimQueue(ServerName deadRS, String queue) {
612    // Wait a bit before transferring the queues, we may be shutting down.
613    // This sleep may not be enough in some cases.
614    try {
615      Thread.sleep(sleepBeforeFailover
616        + (long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
617    } catch (InterruptedException e) {
618      LOG.warn("Interrupted while waiting before transferring a queue.");
619      Thread.currentThread().interrupt();
620    }
621    // We try to lock that rs' queue directory
622    if (server.isStopped()) {
623      LOG.info("Not transferring queue since we are shutting down");
624      return;
625    }
626    // After claim the queues from dead region server, wewill skip to start the
627    // RecoveredReplicationSource if the peer has been removed. but there's possible that remove a
628    // peer with peerId = 2 and add a peer with peerId = 2 again during failover. So we need to get
629    // a copy of the replication peer first to decide whether we should start the
630    // RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to
631    // start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475).
632    String peerId = new ReplicationQueueInfo(queue).getPeerId();
633    ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
634    if (oldPeer == null) {
635      LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist",
636        peerId, queue);
637      return;
638    }
639    Pair<String, SortedSet<String>> claimedQueue;
640    try {
641      claimedQueue = queueStorage.claimQueue(deadRS, queue, server.getServerName());
642    } catch (ReplicationException e) {
643      LOG.error(
644        "ReplicationException: cannot claim dead region ({})'s " + "replication queue. Znode : ({})"
645          + " Possible solution: check if znode size exceeds jute.maxBuffer value. "
646          + " If so, increase it for both client and server side.",
647        deadRS, queueStorage.getRsNode(deadRS), e);
648      server.abort("Failed to claim queue from dead regionserver.", e);
649      return;
650    }
651    if (claimedQueue.getSecond().isEmpty()) {
652      return;
653    }
654    String queueId = claimedQueue.getFirst();
655    Set<String> walsSet = claimedQueue.getSecond();
656    ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
657    if (peer == null || peer != oldPeer) {
658      LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, deadRS);
659      abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
660      return;
661    }
662    if (
663      server instanceof ReplicationSyncUp.DummyServer
664        && peer.getPeerState().equals(PeerState.DISABLED)
665    ) {
666      LOG.warn(
667        "Peer {} is disabled. ReplicationSyncUp tool will skip " + "replicating data to this peer.",
668        peerId);
669      return;
670    }
671
672    ReplicationSourceInterface src;
673    try {
674      src = createSource(queueId, peer);
675    } catch (IOException e) {
676      LOG.error("Can not create replication source for peer {} and queue {}", peerId, queueId, e);
677      server.abort("Failed to create replication source after claiming queue.", e);
678      return;
679    }
680    // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
681    synchronized (oldsources) {
682      peer = replicationPeers.getPeer(src.getPeerId());
683      if (peer == null || peer != oldPeer) {
684        src.terminate("Recovered queue doesn't belong to any current peer");
685        deleteQueue(queueId);
686        return;
687      }
688      // track sources in walsByIdRecoveredQueues
689      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
690      walsByIdRecoveredQueues.put(queueId, walsByGroup);
691      for (String wal : walsSet) {
692        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
693        NavigableSet<String> wals = walsByGroup.get(walPrefix);
694        if (wals == null) {
695          wals = new TreeSet<>();
696          walsByGroup.put(walPrefix, wals);
697        }
698        wals.add(wal);
699      }
700      oldsources.add(src);
701      LOG.info("Added source for recovered queue {}", src.getQueueId());
702      for (String wal : walsSet) {
703        LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
704        src.enqueueLog(new Path(oldLogDir, wal));
705      }
706      src.startup();
707    }
708  }
709
710  /**
711   * Terminate the replication on this region server
712   */
713  public void join() {
714    this.executor.shutdown();
715    for (ReplicationSourceInterface source : this.sources.values()) {
716      source.terminate("Region server is closing");
717    }
718    synchronized (oldsources) {
719      for (ReplicationSourceInterface source : this.oldsources) {
720        source.terminate("Region server is closing");
721      }
722    }
723  }
724
725  /**
726   * Get a copy of the wals of the normal sources on this rs
727   * @return a sorted set of wal names
728   */
729  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
730    return Collections.unmodifiableMap(walsById);
731  }
732
733  /**
734   * Get a copy of the wals of the recovered sources on this rs
735   * @return a sorted set of wal names
736   */
737  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
738    return Collections.unmodifiableMap(walsByIdRecoveredQueues);
739  }
740
741  /**
742   * Get a list of all the normal sources of this rs
743   * @return list of all normal sources
744   */
745  public List<ReplicationSourceInterface> getSources() {
746    return new ArrayList<>(this.sources.values());
747  }
748
749  /**
750   * Get a list of all the recovered sources of this rs
751   * @return list of all recovered sources
752   */
753  public List<ReplicationSourceInterface> getOldSources() {
754    return this.oldsources;
755  }
756
757  /**
758   * Get the normal source for a given peer
759   * @return the normal source for the give peer if it exists, otherwise null.
760   */
761  public ReplicationSourceInterface getSource(String peerId) {
762    return this.sources.get(peerId);
763  }
764
765  List<String> getAllQueues() throws IOException {
766    List<String> allQueues = Collections.emptyList();
767    try {
768      allQueues = queueStorage.getAllQueues(server.getServerName());
769    } catch (ReplicationException e) {
770      throw new IOException(e);
771    }
772    return allQueues;
773  }
774
775  int getSizeOfLatestPath() {
776    synchronized (latestPaths) {
777      return latestPaths.size();
778    }
779  }
780
781  Set<Path> getLastestPath() {
782    synchronized (latestPaths) {
783      return Sets.newHashSet(latestPaths.values());
784    }
785  }
786
787  public long getTotalBufferUsed() {
788    return totalBufferUsed.get();
789  }
790
791  /**
792   * Returns the maximum size in bytes of edits held in memory which are pending replication across
793   * all sources inside this RegionServer.
794   */
795  public long getTotalBufferLimit() {
796    return totalBufferLimit;
797  }
798
799  /**
800   * Get the directory where wals are archived
801   * @return the directory where wals are archived
802   */
803  public Path getOldLogDir() {
804    return this.oldLogDir;
805  }
806
807  /**
808   * Get the directory where wals are stored by their RSs
809   * @return the directory where wals are stored by their RSs
810   */
811  public Path getLogDir() {
812    return this.logDir;
813  }
814
815  /**
816   * Get the handle on the local file system
817   * @return Handle on the local file system
818   */
819  public FileSystem getFs() {
820    return this.fs;
821  }
822
823  /**
824   * Get the ReplicationPeers used by this ReplicationSourceManager
825   * @return the ReplicationPeers used by this ReplicationSourceManager
826   */
827  public ReplicationPeers getReplicationPeers() {
828    return this.replicationPeers;
829  }
830
831  /**
832   * Get a string representation of all the sources' metrics
833   */
834  public String getStats() {
835    StringBuilder stats = new StringBuilder();
836    // Print stats that apply across all Replication Sources
837    stats.append("Global stats: ");
838    stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=")
839      .append(getTotalBufferLimit()).append("B\n");
840    for (ReplicationSourceInterface source : this.sources.values()) {
841      stats.append("Normal source for cluster " + source.getPeerId() + ": ");
842      stats.append(source.getStats() + "\n");
843    }
844    for (ReplicationSourceInterface oldSource : oldsources) {
845      stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
846      stats.append(oldSource.getStats() + "\n");
847    }
848    return stats.toString();
849  }
850
851  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
852    throws IOException {
853    for (ReplicationSourceInterface source : this.sources.values()) {
854      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
855    }
856  }
857
858  public void cleanUpHFileRefs(String peerId, List<String> files) {
859    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
860  }
861
862  int activeFailoverTaskCount() {
863    return executor.getActiveCount();
864  }
865
866  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
867    return this.globalMetrics;
868  }
869
870  /**
871   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. Create it
872   * once only. If exists already, use the existing one.
873   * @see #removeCatalogReplicationSource(RegionInfo)
874   * @see #addSource(String) This is specialization on the addSource method.
875   */
876  public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
877    throws IOException {
878    // Poor-man's putIfAbsent
879    synchronized (this.catalogReplicationSource) {
880      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
881      return rs != null
882        ? rs
883        : this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
884    }
885  }
886
887  /**
888   * Remove the hbase:meta Catalog replication source. Called when we close hbase:meta.
889   * @see #addCatalogReplicationSource(RegionInfo regionInfo)
890   */
891  public void removeCatalogReplicationSource(RegionInfo regionInfo) {
892    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
893    // comes back to this server.
894  }
895
896  /**
897   * Create, initialize, and start the Catalog ReplicationSource. Presumes called one-time only
898   * (caller must ensure one-time only call). This ReplicationSource is NOT created via
899   * {@link ReplicationSourceFactory}.
900   * @see #addSource(String) This is a specialization of the addSource call.
901   * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
902   *      why the special handling).
903   */
904  private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
905    throws IOException {
906    // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
907    // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
908    WALProvider walProvider = this.walFactory.getMetaWALProvider();
909    boolean instantiate = walProvider == null;
910    if (instantiate) {
911      walProvider = this.walFactory.getMetaProvider();
912    }
913    // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
914    // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
915    // read replicas feature that makes use of the source does a reset on a crash of the WAL
916    // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
917    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
918    CatalogReplicationSourcePeer peer =
919      new CatalogReplicationSourcePeer(this.conf, this.clusterId.toString());
920    final ReplicationSourceInterface crs = new CatalogReplicationSource();
921    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
922      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
923    // Add listener on the provider so we can pick up the WAL to replicate on roll.
924    WALActionsListener listener = new WALActionsListener() {
925      @Override
926      public void postLogRoll(Path oldPath, Path newPath) throws IOException {
927        crs.enqueueLog(newPath);
928      }
929    };
930    walProvider.addWALActionsListener(listener);
931    if (!instantiate) {
932      // If we did not instantiate provider, need to add our listener on already-created WAL
933      // instance too (listeners are passed by provider to WAL instance on creation but if provider
934      // created already, our listener add above is missed). And add the current WAL file to the
935      // Replication Source so it can start replicating it.
936      WAL wal = walProvider.getWAL(regionInfo);
937      wal.registerWALActionsListener(listener);
938      crs.enqueueLog(((AbstractFSWAL) wal).getCurrentFileName());
939    }
940    return crs.startup();
941  }
942
943  ReplicationQueueStorage getQueueStorage() {
944    return queueStorage;
945  }
946
947  /**
948   * Acquire the buffer quota for {@link Entry} which is added to {@link WALEntryBatch}.
949   * @param entry the wal entry which is added to {@link WALEntryBatch} and should acquire buffer
950   *              quota.
951   * @return true if we should clear buffer and push all
952   */
953  boolean acquireWALEntryBufferQuota(WALEntryBatch walEntryBatch, Entry entry) {
954    long entrySize = walEntryBatch.incrementUsedBufferSize(entry);
955    return this.acquireBufferQuota(entrySize);
956  }
957
958  /**
959   * To release the buffer quota of {@link WALEntryBatch} which acquired by
960   * {@link ReplicationSourceManager#acquireWALEntryBufferQuota}.
961   * @return the released buffer quota size.
962   */
963  long releaseWALEntryBatchBufferQuota(WALEntryBatch walEntryBatch) {
964    long usedBufferSize = walEntryBatch.getUsedBufferSize();
965    if (usedBufferSize > 0) {
966      this.releaseBufferQuota(usedBufferSize);
967    }
968    return usedBufferSize;
969  }
970
971  /**
972   * Add the size to {@link ReplicationSourceManager#totalBufferUsed} and check if it exceeds
973   * {@link ReplicationSourceManager#totalBufferLimit}.
974   * @return true if {@link ReplicationSourceManager#totalBufferUsed} exceeds
975   *         {@link ReplicationSourceManager#totalBufferLimit},we should stop increase buffer and
976   *         ship all.
977   */
978  boolean acquireBufferQuota(long size) {
979    if (size < 0) {
980      throw new IllegalArgumentException("size should not less than 0");
981    }
982    long newBufferUsed = addTotalBufferUsed(size);
983    return newBufferUsed >= totalBufferLimit;
984  }
985
986  /**
987   * To release the buffer quota which acquired by
988   * {@link ReplicationSourceManager#acquireBufferQuota}.
989   */
990  void releaseBufferQuota(long size) {
991    if (size < 0) {
992      throw new IllegalArgumentException("size should not less than 0");
993    }
994    addTotalBufferUsed(-size);
995  }
996
997  private long addTotalBufferUsed(long size) {
998    if (size == 0) {
999      return totalBufferUsed.get();
1000    }
1001    long newBufferUsed = totalBufferUsed.addAndGet(size);
1002    // Record the new buffer usage
1003    this.globalMetrics.setWALReaderEditsBufferBytes(newBufferUsed);
1004    return newBufferUsed;
1005  }
1006
1007  /**
1008   * Check if {@link ReplicationSourceManager#totalBufferUsed} exceeds
1009   * {@link ReplicationSourceManager#totalBufferLimit} for peer.
1010   * @return true if {@link ReplicationSourceManager#totalBufferUsed} not more than
1011   *         {@link ReplicationSourceManager#totalBufferLimit}.
1012   */
1013  boolean checkBufferQuota(String peerId) {
1014    // try not to go over total quota
1015    if (totalBufferUsed.get() > totalBufferLimit) {
1016      LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
1017        peerId, totalBufferUsed.get(), totalBufferLimit);
1018      return false;
1019    }
1020    return true;
1021  }
1022}