001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.TimeUnit;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.CellScanner;
031import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.Server;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.conf.ConfigurationManager;
036import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
037import org.apache.hadoop.hbase.regionserver.HRegionServer;
038import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
039import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
040import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
041import org.apache.hadoop.hbase.replication.ReplicationFactory;
042import org.apache.hadoop.hbase.replication.ReplicationPeers;
043import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
044import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
045import org.apache.hadoop.hbase.replication.ReplicationUtils;
046import org.apache.hadoop.hbase.util.Pair;
047import org.apache.hadoop.hbase.wal.WALFactory;
048import org.apache.hadoop.hbase.wal.WALProvider;
049import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.apache.zookeeper.KeeperException;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
058
059/**
060 * Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
061 * <p>
062 * Implement {@link PropagatingConfigurationObserver} mainly for registering
063 * {@link ReplicationPeers}, so we can recreating the replication peer storage.
064 */
065@InterfaceAudience.Private
066public class Replication
067  implements ReplicationSourceService, ReplicationSinkService, PropagatingConfigurationObserver {
068  private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
069  private boolean isReplicationForBulkLoadDataEnabled;
070  private ReplicationSourceManager replicationManager;
071  private ReplicationQueueStorage queueStorage;
072  private ReplicationPeers replicationPeers;
073  private volatile Configuration conf;
074  private ReplicationSink replicationSink;
075  // Hosting server
076  private Server server;
077  /** Statistics thread schedule pool */
078  private ScheduledExecutorService scheduleThreadPool;
079  private int statsThreadPeriod;
080  // ReplicationLoad to access replication metrics
081  private ReplicationLoad replicationLoad;
082  private MetricsReplicationGlobalSourceSource globalMetricsSource;
083
084  private PeerProcedureHandler peerProcedureHandler;
085
086  /**
087   * Empty constructor
088   */
089  public Replication() {
090  }
091
092  @Override
093  public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
094    WALFactory walFactory) throws IOException {
095    this.server = server;
096    this.conf = this.server.getConfiguration();
097    this.isReplicationForBulkLoadDataEnabled =
098      ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
099    this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
100      new ThreadFactoryBuilder()
101        .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
102        .setDaemon(true).build());
103    if (this.isReplicationForBulkLoadDataEnabled) {
104      if (
105        conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
106          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()
107      ) {
108        throw new IllegalArgumentException(
109          HConstants.REPLICATION_CLUSTER_ID + " cannot be null/empty when "
110            + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY + " is set to true.");
111      }
112    }
113
114    try {
115      this.queueStorage =
116        ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
117      this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getFileSystem(),
118        server.getZooKeeper(), this.conf);
119      this.replicationPeers.init();
120    } catch (Exception e) {
121      throw new IOException("Failed replication handler create", e);
122    }
123    UUID clusterId = null;
124    try {
125      clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
126    } catch (KeeperException ke) {
127      throw new IOException("Could not read cluster id", ke);
128    }
129    this.globalMetricsSource = CompatibilitySingletonFactory
130      .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
131    this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, conf,
132      this.server, fs, logDir, oldLogDir, clusterId, walFactory, globalMetricsSource);
133    // Get the user-space WAL provider
134    WALProvider walProvider = walFactory != null ? walFactory.getWALProvider() : null;
135    if (walProvider != null) {
136      walProvider
137        .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
138    }
139    this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
140    LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
141    this.replicationLoad = new ReplicationLoad();
142
143    this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
144  }
145
146  @Override
147  public PeerProcedureHandler getPeerProcedureHandler() {
148    return peerProcedureHandler;
149  }
150
151  /**
152   * Stops replication service.
153   */
154  @Override
155  public void stopReplicationService() {
156    join();
157  }
158
159  /**
160   * Join with the replication threads
161   */
162  public void join() {
163    this.replicationManager.join();
164    if (this.replicationSink != null) {
165      this.replicationSink.stopReplicationSinkServices();
166    }
167    scheduleThreadPool.shutdown();
168  }
169
170  /**
171   * Carry on the list of log entries down to the sink
172   * @param entries                    list of entries to replicate
173   * @param cells                      The data -- the cells -- that <code>entries</code> describes
174   *                                   (the entries do not contain the Cells we are replicating;
175   *                                   they are passed here on the side in this CellScanner).
176   * @param replicationClusterId       Id which will uniquely identify source cluster FS client
177   *                                   configurations in the replication configuration directory
178   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
179   *                                   directory required for replicating hfiles
180   * @param sourceHFileArchiveDirPath  Path that point to the source cluster hfile archive directory
181   */
182  @Override
183  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
184    String replicationClusterId, String sourceBaseNamespaceDirPath,
185    String sourceHFileArchiveDirPath) throws IOException {
186    this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
187      sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
188  }
189
190  /**
191   * If replication is enabled and this cluster is a master, it starts
192   */
193  @Override
194  public void startReplicationService() throws IOException {
195    this.replicationManager.init();
196    RegionServerCoprocessorHost rsServerHost = null;
197    if (server instanceof HRegionServer) {
198      rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
199    }
200    this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
201    this.scheduleThreadPool.scheduleAtFixedRate(
202      new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
203      statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
204    LOG.info("{} started", this.server.toString());
205  }
206
207  /**
208   * Get the replication sources manager
209   * @return the manager if replication is enabled, else returns false
210   */
211  public ReplicationSourceManager getReplicationManager() {
212    return this.replicationManager;
213  }
214
215  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
216    throws IOException {
217    try {
218      this.replicationManager.addHFileRefs(tableName, family, pairs);
219    } catch (IOException e) {
220      LOG.error("Failed to add hfile references in the replication queue.", e);
221      throw e;
222    }
223  }
224
225  /**
226   * Statistics task. Periodically prints the cache statistics to the log.
227   */
228  private final static class ReplicationStatisticsTask implements Runnable {
229
230    private final ReplicationSink replicationSink;
231    private final ReplicationSourceManager replicationManager;
232
233    public ReplicationStatisticsTask(ReplicationSink replicationSink,
234      ReplicationSourceManager replicationManager) {
235      this.replicationManager = replicationManager;
236      this.replicationSink = replicationSink;
237    }
238
239    @Override
240    public void run() {
241      printStats(this.replicationManager.getStats());
242      printStats(this.replicationSink.getStats());
243    }
244
245    private void printStats(String stats) {
246      if (!stats.isEmpty()) {
247        LOG.info(stats);
248      }
249    }
250  }
251
252  @Override
253  public ReplicationLoad refreshAndGetReplicationLoad() {
254    if (this.replicationLoad == null) {
255      return null;
256    }
257    // always build for latest data
258    buildReplicationLoad();
259    return this.replicationLoad;
260  }
261
262  private void buildReplicationLoad() {
263    List<ReplicationSourceInterface> allSources = new ArrayList<>();
264    allSources.addAll(this.replicationManager.getSources());
265    allSources.addAll(this.replicationManager.getOldSources());
266
267    // get sink
268    MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
269    this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
270  }
271
272  @Override
273  public void onConfigurationChange(Configuration conf) {
274    this.conf = conf;
275  }
276
277  @Override
278  public void registerChildren(ConfigurationManager manager) {
279    manager.registerObserver(replicationPeers);
280  }
281
282  @Override
283  public void deregisterChildren(ConfigurationManager manager) {
284    manager.deregisterObserver(replicationPeers);
285  }
286}