Class ReplicationSource
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSource
- All Implemented Interfaces:
ReplicationSourceInterface
- Direct Known Subclasses:
CatalogReplicationSource
,RecoveredReplicationSource
Class that handles the source of a replication stream. Currently does not handle more than 1
slave cluster. For each slave cluster it selects a random number of peers using a replication
ratio. For example, if replication ration = 0.1 and slave cluster has 100 region servers, 10 will
be selected.
A stream is considered down when we cannot contact a region server on the peer cluster for more than 55 seconds by default.
-
Field Summary
Modifier and TypeFieldDescriptionprivate boolean
private final List<WALEntryFilter>
Base WALEntry filters for this class.private UUID
protected org.apache.hadoop.conf.Configuration
private long
static final int
private long
private final Predicate<org.apache.hadoop.fs.Path>
WALs to replicate.protected org.apache.hadoop.fs.FileSystem
private Thread
private static final org.slf4j.Logger
protected ReplicationSourceLogQueue
protected ReplicationSourceManager
private int
private MetricsSource
private String
protected String
protected int
protected ReplicationQueueStorage
private ReplicationEndpoint
protected ReplicationPeer
protected ReplicationQueueInfo
private AtomicBoolean
protected Server
private long
(package private) boolean
private AtomicBoolean
private ReplicationThrottler
private AtomicLong
static final String
private int
protected WALEntryFilter
A filter (or a chain of filters) for WAL entries; filters out edits.private WALFileLengthProvider
protected final ConcurrentHashMap<String,
ReplicationSourceShipper> -
Constructor Summary
ConstructorDescriptionReplicationSource
(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addHFileRefs
(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) Add hfile names to the queue to be replicated.private void
private void
checkError
(Thread t, Throwable error) protected ReplicationSourceShipper
createNewShipper
(String walGroupId) private ReplicationSourceWALReader
createNewWALReader
(String walGroupId, long startPosition) private ReplicationEndpoint
private void
void
enqueueLog
(org.apache.hadoop.fs.Path wal) Add a log to the list of logs to replicateprivate long
org.apache.hadoop.fs.Path
Get the current log that's replicatedprivate long
getFileSize
(org.apache.hadoop.fs.Path currentPath) Get the id that the source is replicating to.Get the queue id that the source is replicating toMap<String,
PriorityBlockingQueue<org.apache.hadoop.fs.Path>> Returns the replication endpoint used by this replication sourceReturns The instance of queueStorage used by this ReplicationSource.(package private) Server
The queue of WALs only belong to one region server.Returns the replication source managerReturns metrics of this replication sourcegetStats()
Get a string representation of the current statistics for this sourcelong
(package private) WALEntryFilter
Call afterinitializeWALEntryFilter(UUID)
else it will be null.Returns the wal file length providerget the stat of replication for each wal group.void
init
(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) Instantiation method used by region serversprivate void
initAndStartReplicationEndpoint
(ReplicationEndpoint replicationEndpoint) private void
private void
initializeWALEntryFilter
(UUID peerClusterId) boolean
check whether the peer is enabled or notboolean
Returns active or notboolean
Returns String to use as a log prefix that contains current peerId.void
postShipEdits
(List<WAL.Entry> entries, long batchSize) Call this after the shipper thread ship some entries to peer cluster.private void
retryRefreshing
(Thread t, Throwable error) private void
setSourceStartupStatus
(boolean initializing) protected boolean
sleepForRetries
(String msg, int sleepMultiplier) Do the sleeping logicstartup()
Start the replicationvoid
End the replicationvoid
End the replicationvoid
End the replicationvoid
private void
tryStartNewShipper
(String walGroupId) void
tryThrottle
(int batchSize) Try to throttle when the peer config with a bandwidthMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
isRecovered, logPositionAndCleanOldLogs
-
Field Details
-
LOG
-
queueSizePerGroup
-
logQueue
-
queueStorage
-
replicationPeer
-
conf
-
replicationQueueInfo
-
peerId
-
manager
-
server
-
sleepForRetries
-
fs
-
clusterId
-
totalReplicatedEdits
-
queueId
-
maxRetriesMultiplier
-
sourceRunning
-
metrics
-
replicationEndpoint
-
abortOnError
-
startupOngoing
-
retryStartup
-
walEntryFilter
A filter (or a chain of filters) for WAL entries; filters out edits. -
throttler
-
defaultBandwidth
-
currentBandwidth
-
walFileLengthProvider
-
workerThreads
-
WAIT_ON_ENDPOINT_SECONDS
- See Also:
-
DEFAULT_WAIT_ON_ENDPOINT_SECONDS
- See Also:
-
waitOnEndpointSeconds
-
initThread
-
filterInWALs
WALs to replicate. Predicate that returns 'true' for WALs to replicate and false for WALs to skip. -
baseFilterOutWALEntries
Base WALEntry filters for this class. Unmodifiable. Set on construction. Filters *out* edits we do not want replicated, passed on to replication endpoints. This is the basic set. Down in #initializeWALEntryFilter this set is added to the end of the WALEntry filter chain. These are put after those that we pick up from the configured endpoints and other machinations to create the finalwalEntryFilter
.- See Also:
-
-
Constructor Details
-
ReplicationSource
-
ReplicationSource
ReplicationSource(Predicate<org.apache.hadoop.fs.Path> replicateWAL, List<WALEntryFilter> baseFilterOutWALEntries) - Parameters:
replicateWAL
- Pass a filter to run against WAL Path; filter *in* WALs to Replicate; i.e. return 'true' if you want to replicate the content of the WAL.baseFilterOutWALEntries
- Base set of filters you want applied always; filters *out* WALEntries so they never make it out of this ReplicationSource.
-
-
Method Details
-
init
public void init(org.apache.hadoop.conf.Configuration conf, org.apache.hadoop.fs.FileSystem fs, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException Instantiation method used by region servers- Specified by:
init
in interfaceReplicationSourceInterface
- Parameters:
conf
- configuration to usefs
- file system to usemanager
- replication manager to ping toserver
- the server for this region serverqueueId
- the id of our replication queueclusterId
- unique UUID for the clustermetrics
- metrics for replication source- Throws:
IOException
-
decorateConf
-
enqueueLog
Description copied from interface:ReplicationSourceInterface
Add a log to the list of logs to replicate- Specified by:
enqueueLog
in interfaceReplicationSourceInterface
- Parameters:
wal
- path to the log to replicate
-
getQueues
-
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws ReplicationExceptionDescription copied from interface:ReplicationSourceInterface
Add hfile names to the queue to be replicated.- Specified by:
addHFileRefs
in interfaceReplicationSourceInterface
- Parameters:
tableName
- Name of the table these files belongs tofamily
- Name of the family these files belong topairs
- list of pairs of { HFile location in staging dir, HFile path in region dir which will be added in the queue for replication}- Throws:
ReplicationException
- If failed to add hfile references
-
createReplicationEndpoint
-
initAndStartReplicationEndpoint
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint) throws IOException, TimeoutException - Throws:
IOException
TimeoutException
-
initializeWALEntryFilter
-
tryStartNewShipper
-
getWalGroupStatus
Description copied from interface:ReplicationSourceInterface
get the stat of replication for each wal group.- Specified by:
getWalGroupStatus
in interfaceReplicationSourceInterface
- Returns:
- stat of replication
-
getFileSize
- Throws:
IOException
-
createNewShipper
-
createNewWALReader
-
getWalEntryFilter
Call afterinitializeWALEntryFilter(UUID)
else it will be null.- Returns:
- WAL Entry Filter Chain to use on WAL files filtering *out* WALEntry edits.
-
checkError
-
retryRefreshing
-
getReplicationEndpoint
Description copied from interface:ReplicationSourceInterface
Returns the replication endpoint used by this replication source- Specified by:
getReplicationEndpoint
in interfaceReplicationSourceInterface
-
getSourceManager
Description copied from interface:ReplicationSourceInterface
Returns the replication source manager- Specified by:
getSourceManager
in interfaceReplicationSourceInterface
-
tryThrottle
Description copied from interface:ReplicationSourceInterface
Try to throttle when the peer config with a bandwidth- Specified by:
tryThrottle
in interfaceReplicationSourceInterface
- Parameters:
batchSize
- entries size will be pushed- Throws:
InterruptedException
-
checkBandwidthChangeAndResetThrottler
-
getCurrentBandwidth
-
sleepForRetries
Do the sleeping logic- Parameters:
msg
- Why we sleepsleepMultiplier
- by how many times the default sleeping time is augmented- Returns:
- True if
sleepMultiplier
is <maxRetriesMultiplier
-
isPeerEnabled
check whether the peer is enabled or not- Specified by:
isPeerEnabled
in interfaceReplicationSourceInterface
- Returns:
- true if the peer is enabled, otherwise false
-
initialize
-
setSourceStartupStatus
-
startup
Description copied from interface:ReplicationSourceInterface
Start the replication- Specified by:
startup
in interfaceReplicationSourceInterface
-
terminate
Description copied from interface:ReplicationSourceInterface
End the replication- Specified by:
terminate
in interfaceReplicationSourceInterface
- Parameters:
reason
- why it's terminating
-
terminate
Description copied from interface:ReplicationSourceInterface
End the replication- Specified by:
terminate
in interfaceReplicationSourceInterface
- Parameters:
reason
- why it's terminatingcause
- the error that's causing it
-
terminate
Description copied from interface:ReplicationSourceInterface
End the replication- Specified by:
terminate
in interfaceReplicationSourceInterface
- Parameters:
reason
- why it's terminatingcause
- the error that's causing itclearMetrics
- removes all metrics about this Source
-
terminate
-
getQueueId
Description copied from interface:ReplicationSourceInterface
Get the queue id that the source is replicating to- Specified by:
getQueueId
in interfaceReplicationSourceInterface
- Returns:
- queue id
-
getPeerId
Description copied from interface:ReplicationSourceInterface
Get the id that the source is replicating to.- Specified by:
getPeerId
in interfaceReplicationSourceInterface
- Returns:
- peer id
-
getCurrentPath
Description copied from interface:ReplicationSourceInterface
Get the current log that's replicated- Specified by:
getCurrentPath
in interfaceReplicationSourceInterface
- Returns:
- the current log
-
isSourceActive
Description copied from interface:ReplicationSourceInterface
Returns active or not- Specified by:
isSourceActive
in interfaceReplicationSourceInterface
-
getReplicationQueueInfo
-
isWorkerRunning
-
getStats
Description copied from interface:ReplicationSourceInterface
Get a string representation of the current statistics for this source- Specified by:
getStats
in interfaceReplicationSourceInterface
- Returns:
- printable stats
-
getSourceMetrics
Description copied from interface:ReplicationSourceInterface
Returns metrics of this replication source- Specified by:
getSourceMetrics
in interfaceReplicationSourceInterface
-
postShipEdits
Description copied from interface:ReplicationSourceInterface
Call this after the shipper thread ship some entries to peer cluster.- Specified by:
postShipEdits
in interfaceReplicationSourceInterface
- Parameters:
entries
- pushedbatchSize
- entries size pushed
-
getWALFileLengthProvider
Description copied from interface:ReplicationSourceInterface
Returns the wal file length provider- Specified by:
getWALFileLengthProvider
in interfaceReplicationSourceInterface
-
getServerWALsBelongTo
Description copied from interface:ReplicationSourceInterface
The queue of WALs only belong to one region server. This will return the server name which all WALs belong to.- Specified by:
getServerWALsBelongTo
in interfaceReplicationSourceInterface
- Returns:
- the server name which all WALs belong to
-
getServer
-
getReplicationQueueStorage
Description copied from interface:ReplicationSourceInterface
Returns The instance of queueStorage used by this ReplicationSource.- Specified by:
getReplicationQueueStorage
in interfaceReplicationSourceInterface
-
logPeerId
Returns String to use as a log prefix that contains current peerId. -
getTotalReplicatedEdits
-