Class ReplicationSourceManager
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
This class is responsible to manage all the replication sources. There are two classes of
sources:
- Normal sources are persistent and one per peer cluster
- Old sources are recovered from a failed region server and our only goal is to finish replicating the WAL queue it had
When a region server dies, this class uses a watcher to get notified and it tries to grab a lock in order to transfer all the queues in a local old source.
Synchronization specification:
- No need synchronized on
sources
.sources
is a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl
. So there is no race for peer operations. - Need synchronized on
walsById
. There are four methods which modify it,addPeer(String)
,removePeer(String)
,cleanOldLogs(NavigableSet, String, boolean, String)
andpreLogRoll(Path)
.walsById
is a ConcurrentHashMap and there is a Lock for peer id inPeerProcedureHandlerImpl
. So there is no race betweenaddPeer(String)
andremovePeer(String)
.cleanOldLogs(NavigableSet, String, boolean, String)
is called byReplicationSourceInterface
. So no race withaddPeer(String)
.removePeer(String)
will terminate theReplicationSourceInterface
firstly, then remove the wals fromwalsById
. So no race withremovePeer(String)
. The only case need synchronized iscleanOldLogs(NavigableSet, String, boolean, String)
andpreLogRoll(Path)
. - No need synchronized on
walsByIdRecoveredQueues
. There are three methods which modify it,removePeer(String)
,cleanOldLogs(NavigableSet, String, boolean, String)
andclaimQueue(ServerName, String)
.cleanOldLogs(NavigableSet, String, boolean, String)
is called byReplicationSourceInterface
.removePeer(String)
will terminate theReplicationSourceInterface
firstly, then remove the wals fromwalsByIdRecoveredQueues
. AndclaimQueue(ServerName, String)
will add the wals towalsByIdRecoveredQueues
firstly, then start up aReplicationSourceInterface
. So there is no race here. ForclaimQueue(ServerName, String)
andremovePeer(String)
, there is already synchronized onoldsources
. So no need synchronized onwalsByIdRecoveredQueues
. - Need synchronized on
latestPaths
to avoid the new open source miss new log. - Need synchronized on
oldsources
to avoid adding recovered source for the to-be-removed peer.
-
Nested Class Summary
Modifier and TypeClassDescriptionprivate static interface
-
Field Summary
Modifier and TypeFieldDescription(package private) AtomicReference<ReplicationSourceInterface>
A special ReplicationSource for hbase:meta Region Read Replicas.private final UUID
private final org.apache.hadoop.conf.Configuration
private final ThreadPoolExecutor
private final org.apache.hadoop.fs.FileSystem
private final MetricsReplicationGlobalSourceSource
private static final org.slf4j.Logger
private final org.apache.hadoop.fs.Path
private final org.apache.hadoop.fs.Path
private final List<ReplicationSourceInterface>
private final ReplicationQueueStorage
Storage for queues that need persistance; e.g.private final boolean
private final ReplicationPeers
private final Server
private final long
private final ConcurrentMap<String,
ReplicationSourceInterface> private final long
private AtomicLong
private final WALFactory
private final ConcurrentMap<String,
Map<String, NavigableSet<String>>> private final ConcurrentMap<String,
Map<String, NavigableSet<String>>> -
Constructor Summary
ConstructorDescriptionReplicationSourceManager
(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, MetricsReplicationGlobalSourceSource globalMetrics) Creates a replication manager and sets the watch on all the other registered region servers -
Method Summary
Modifier and TypeMethodDescriptionprivate void
private void
(package private) boolean
acquireBufferQuota
(long size) Add the size tototalBufferUsed
and check if it exceedstotalBufferLimit
.(package private) boolean
acquireWALEntryBufferQuota
(WALEntryBatch walEntryBatch, WAL.Entry entry) Acquire the buffer quota forWAL.Entry
which is added toWALEntryBatch
.(package private) int
addCatalogReplicationSource
(RegionInfo regionInfo) Add an hbase:meta Catalog replication source.void
addHFileRefs
(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) void
1.(package private) ReplicationSourceInterface
Add a normal source for the given peer on this region server.private long
addTotalBufferUsed
(long size) (package private) boolean
checkBufferQuota
(String peerId) Check iftotalBufferUsed
exceedstotalBufferLimit
for peer.(package private) void
claimQueue
(ServerName deadRS, String queue) (package private) void
cleanOldLogs
(String log, boolean inclusive, String queueId, boolean queueRecovered) Cleans a log file and all older logs from replication queue.private void
cleanOldLogs
(NavigableSet<String> wals, String key, boolean inclusive, String id) void
cleanUpHFileRefs
(String peerId, List<String> files) private ReplicationSourceInterface
createCatalogReplicationSource
(RegionInfo regionInfo) Create, initialize, and start the Catalog ReplicationSource.private ReplicationSourceInterface
createSource
(String queueId, ReplicationPeer replicationPeer) private void
deleteQueue
(String queueId) Delete a complete queue of wals associated with a replication sourceorg.apache.hadoop.fs.FileSystem
getFs()
Get the handle on the local file system(package private) MetricsReplicationGlobalSourceSource
(package private) Set<org.apache.hadoop.fs.Path>
org.apache.hadoop.fs.Path
Get the directory where wals are stored by their RSsorg.apache.hadoop.fs.Path
Get the directory where wals are archivedGet a list of all the recovered sources of this rs(package private) ReplicationQueueStorage
Get the ReplicationPeers used by this ReplicationSourceManager(package private) int
Get the normal source for a given peerGet a list of all the normal sources of this rsgetStats()
Get a string representation of all the sources' metricslong
Returns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer.long
getWALs()
Get a copy of the wals of the normal sources on this rsGet a copy of the wals of the recovered sources on this rs(package private) void
init()
Adds a normal source per registered peer cluster.private void
Refresh replication source will terminate the old source first, then the source thread will be interrupted.void
join()
Terminate the replication on this region servervoid
logPositionAndCleanOldLogs
(ReplicationSourceInterface source, WALEntryBatch entryBatch) This method will log the current position to storage.void
postLogRoll
(org.apache.hadoop.fs.Path newLog) void
preLogRoll
(org.apache.hadoop.fs.Path newLog) void
refreshSources
(String peerId) Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes.(package private) void
releaseBufferQuota
(long size) To release the buffer quota which acquired byacquireBufferQuota(long)
.(package private) long
releaseWALEntryBatchBufferQuota
(WALEntryBatch walEntryBatch) To release the buffer quota ofWALEntryBatch
which acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry)
.void
removeCatalogReplicationSource
(RegionInfo regionInfo) Remove the hbase:meta Catalog replication source.void
removePeer
(String peerId) 1.(package private) void
Clear the metrics and related replication queue of the specified old source(package private) void
Clear the metrics and related replication queue of the specified old sourceprivate void
-
Field Details
-
LOG
-
sources
-
oldsources
-
queueStorage
Storage for queues that need persistance; e.g. Replication state so can be recovered after a crash. queueStorage upkeep is spread about this class and passed to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource instances keep state. -
replicationPeers
-
clusterId
-
server
-
walsById
-
walsByIdRecoveredQueues
-
conf
-
fs
-
latestPaths
-
logDir
-
oldLogDir
-
walFactory
-
sleepBeforeFailover
-
executor
-
replicationForBulkLoadDataEnabled
-
totalBufferUsed
-
totalBufferLimit
-
globalMetrics
-
catalogReplicationSource
A special ReplicationSource for hbase:meta Region Read Replicas. Usually this reference remains empty. If an hbase:meta Region is opened on this server, we will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from this server (in case it later gets moved back). We synchronize on this instance testing for presence and if absent, while creating so only created and started once.
-
-
Constructor Details
-
ReplicationSourceManager
public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, org.apache.hadoop.conf.Configuration conf, Server server, org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path logDir, org.apache.hadoop.fs.Path oldLogDir, UUID clusterId, WALFactory walFactory, MetricsReplicationGlobalSourceSource globalMetrics) throws IOException Creates a replication manager and sets the watch on all the other registered region servers- Parameters:
queueStorage
- the interface for manipulating replication queuesconf
- the configuration to useserver
- the server for this region serverfs
- the file system to uselogDir
- the directory that contains all wal directories of live RSsoldLogDir
- the directory where old logs are archived- Throws:
IOException
-
-
Method Details
-
init
Adds a normal source per registered peer cluster.- Throws:
IOException
-
addPeer
1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add HFile Refs- Parameters:
peerId
- the id of replication peer- Throws:
IOException
-
removePeer
1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id and related replication queues 3. Remove the normal source and related replication queue 4. Remove HFile Refs- Parameters:
peerId
- the id of the replication peer
-
createSource
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException - Parameters:
queueId
- the id of the replication queue to associate the ReplicationSource with.- Returns:
- a new 'classic' user-space replication source.
- Throws:
IOException
- See Also:
-
addSource
Add a normal source for the given peer on this region server. Meanwhile, add new replication queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal group and do replication- Parameters:
peerId
- the id of the replication peer- Returns:
- the source that was created
- Throws:
IOException
-
refreshSources
Close the previous replication sources of this peer id and open new sources to trigger the new replication state changes or new replication config changes. Here we don't need to change replication queue storage and only to enqueue all logs to the new replication source- Parameters:
peerId
- the id of the replication peer- Throws:
IOException
-
removeRecoveredSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src
- source to clear
-
removeSource
Clear the metrics and related replication queue of the specified old source- Parameters:
src
- source to clear
-
deleteQueue
Delete a complete queue of wals associated with a replication source- Parameters:
queueId
- the id of replication queue to delete
-
interruptOrAbortWhenFail
Refresh replication source will terminate the old source first, then the source thread will be interrupted. Need to handle it instead of abort the region server. -
abortWhenFail
-
throwIOExceptionWhenFail
private void throwIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
abortAndThrowIOExceptionWhenFail
private void abortAndThrowIOExceptionWhenFail(ReplicationSourceManager.ReplicationQueueOperation op) throws IOException - Throws:
IOException
-
logPositionAndCleanOldLogs
This method will log the current position to storage. And also clean old logs from the replication queue.- Parameters:
entryBatch
- the wal entry batch we just shipped
-
cleanOldLogs
Cleans a log file and all older logs from replication queue. Called when we are sure that a log file is closed and has no more entries.- Parameters:
log
- Path to the loginclusive
- whether we should also remove the given log filequeueId
- id of the replication queuequeueRecovered
- Whether this is a recovered queue
-
preLogRoll
- Throws:
IOException
-
postLogRoll
- Throws:
IOException
-
claimQueue
-
join
Terminate the replication on this region server -
getWALs
Get a copy of the wals of the normal sources on this rs- Returns:
- a sorted set of wal names
-
getWalsByIdRecoveredQueues
Get a copy of the wals of the recovered sources on this rs- Returns:
- a sorted set of wal names
-
getSources
Get a list of all the normal sources of this rs- Returns:
- list of all normal sources
-
getOldSources
Get a list of all the recovered sources of this rs- Returns:
- list of all recovered sources
-
getSource
Get the normal source for a given peer- Returns:
- the normal source for the give peer if it exists, otherwise null.
-
getAllQueues
- Throws:
IOException
-
getSizeOfLatestPath
int getSizeOfLatestPath() -
getLastestPath
Set<org.apache.hadoop.fs.Path> getLastestPath() -
getTotalBufferUsed
-
getTotalBufferLimit
Returns the maximum size in bytes of edits held in memory which are pending replication across all sources inside this RegionServer. -
getOldLogDir
Get the directory where wals are archived- Returns:
- the directory where wals are archived
-
getLogDir
Get the directory where wals are stored by their RSs- Returns:
- the directory where wals are stored by their RSs
-
getFs
Get the handle on the local file system- Returns:
- Handle on the local file system
-
getReplicationPeers
Get the ReplicationPeers used by this ReplicationSourceManager- Returns:
- the ReplicationPeers used by this ReplicationSourceManager
-
getStats
Get a string representation of all the sources' metrics -
addHFileRefs
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path>> pairs) throws IOException- Throws:
IOException
-
cleanUpHFileRefs
-
activeFailoverTaskCount
int activeFailoverTaskCount() -
getGlobalMetrics
-
addCatalogReplicationSource
public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo) throws IOException Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region. Create it once only. If exists already, use the existing one. -
removeCatalogReplicationSource
Remove the hbase:meta Catalog replication source. Called when we close hbase:meta. -
createCatalogReplicationSource
private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo) throws IOException Create, initialize, and start the Catalog ReplicationSource. Presumes called one-time only (caller must ensure one-time only call). This ReplicationSource is NOT created viaReplicationSourceFactory
. -
getQueueStorage
-
acquireWALEntryBufferQuota
Acquire the buffer quota forWAL.Entry
which is added toWALEntryBatch
.- Parameters:
entry
- the wal entry which is added toWALEntryBatch
and should acquire buffer quota.- Returns:
- true if we should clear buffer and push all
-
releaseWALEntryBatchBufferQuota
To release the buffer quota ofWALEntryBatch
which acquired byacquireWALEntryBufferQuota(org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch, org.apache.hadoop.hbase.wal.WAL.Entry)
.- Returns:
- the released buffer quota size.
-
acquireBufferQuota
Add the size tototalBufferUsed
and check if it exceedstotalBufferLimit
.- Returns:
- true if
totalBufferUsed
exceedstotalBufferLimit
,we should stop increase buffer and ship all.
-
releaseBufferQuota
To release the buffer quota which acquired byacquireBufferQuota(long)
. -
addTotalBufferUsed
-
checkBufferQuota
Check iftotalBufferUsed
exceedstotalBufferLimit
for peer.- Returns:
- true if
totalBufferUsed
not more thantotalBufferLimit
.
-