001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.UUID;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.replication.ReplicationException;
033import org.apache.hadoop.hbase.replication.ReplicationPeer;
034import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
035import org.apache.hadoop.hbase.util.Pair;
036import org.apache.hadoop.hbase.wal.WAL.Entry;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * Interface that defines a replication source
041 */
042@InterfaceAudience.Private
043public interface ReplicationSourceInterface {
044  /**
045   * Initializer for the source
046   * @param conf    the configuration to use
047   * @param fs      the file system to use
048   * @param manager the manager to use
049   * @param server  the server for this region server
050   */
051  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
052    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
053    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
054    MetricsSource metrics) throws IOException;
055
056  /**
057   * Add a log to the list of logs to replicate
058   * @param log path to the log to replicate
059   */
060  void enqueueLog(Path log);
061
062  /**
063   * Add hfile names to the queue to be replicated.
064   * @param tableName Name of the table these files belongs to
065   * @param family    Name of the family these files belong to
066   * @param pairs     list of pairs of { HFile location in staging dir, HFile path in region dir
067   *                  which will be added in the queue for replication}
068   * @throws ReplicationException If failed to add hfile references
069   */
070  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
071    throws ReplicationException;
072
073  /**
074   * Start the replication
075   */
076  ReplicationSourceInterface startup();
077
078  /**
079   * End the replication
080   * @param reason why it's terminating
081   */
082  void terminate(String reason);
083
084  /**
085   * End the replication
086   * @param reason why it's terminating
087   * @param cause  the error that's causing it
088   */
089  void terminate(String reason, Exception cause);
090
091  /**
092   * End the replication
093   * @param reason       why it's terminating
094   * @param cause        the error that's causing it
095   * @param clearMetrics removes all metrics about this Source
096   */
097  void terminate(String reason, Exception cause, boolean clearMetrics);
098
099  /**
100   * Get the current log that's replicated
101   * @return the current log
102   */
103  Path getCurrentPath();
104
105  /**
106   * Get the queue id that the source is replicating to
107   * @return queue id
108   */
109  String getQueueId();
110
111  /**
112   * Get the id that the source is replicating to.
113   * @return peer id
114   */
115  String getPeerId();
116
117  /**
118   * Get a string representation of the current statistics for this source
119   * @return printable stats
120   */
121  String getStats();
122
123  /** Returns peer enabled or not */
124  boolean isPeerEnabled();
125
126  /** Returns active or not */
127  boolean isSourceActive();
128
129  /** Returns metrics of this replication source */
130  MetricsSource getSourceMetrics();
131
132  /** Returns the replication endpoint used by this replication source */
133  ReplicationEndpoint getReplicationEndpoint();
134
135  /** Returns the replication source manager */
136  ReplicationSourceManager getSourceManager();
137
138  /** Returns the wal file length provider */
139  WALFileLengthProvider getWALFileLengthProvider();
140
141  /**
142   * Try to throttle when the peer config with a bandwidth
143   * @param batchSize entries size will be pushed
144   */
145  void tryThrottle(int batchSize) throws InterruptedException;
146
147  /**
148   * Call this after the shipper thread ship some entries to peer cluster.
149   * @param entries   pushed
150   * @param batchSize entries size pushed
151   */
152  void postShipEdits(List<Entry> entries, long batchSize);
153
154  /**
155   * The queue of WALs only belong to one region server. This will return the server name which all
156   * WALs belong to.
157   * @return the server name which all WALs belong to
158   */
159  ServerName getServerWALsBelongTo();
160
161  /**
162   * get the stat of replication for each wal group.
163   * @return stat of replication
164   */
165  default Map<String, ReplicationStatus> getWalGroupStatus() {
166    return new HashMap<>();
167  }
168
169  /** Returns whether this is a replication source for recovery. */
170  default boolean isRecovered() {
171    return false;
172  }
173
174  /** Returns The instance of queueStorage used by this ReplicationSource. */
175  ReplicationQueueStorage getReplicationQueueStorage();
176
177  /**
178   * Log the current position to storage. Also clean old logs from the replication queue. Use to
179   * bypass the default call to
180   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface, WALEntryBatch)}
181   * whem implementation does not need to persist state to backing storage.
182   * @param entryBatch the wal entry batch we just shipped
183   * @return The instance of queueStorage used by this ReplicationSource.
184   */
185  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
186    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
187  }
188}