Class ReplicationSink
java.lang.Object
org.apache.hadoop.hbase.replication.regionserver.ReplicationSink
This class is responsible for replicating the edits coming from another cluster.
This replication process is currently waiting for the edits to be applied before the method can return. This means that the replication of edits is synchronized (after reading from WALs in ReplicationSource) and that a single region server cannot receive edits from two sources at the same time
This class uses the native HBase client in order to replicate entries.
TODO make this class more like ReplicationSource wrt log handling-
Field Summary
Modifier and TypeFieldDescriptionprivate final org.apache.hadoop.conf.Configuration
private long
private static final org.slf4j.Logger
private final MetricsSink
private SourceFSConfigurationProvider
private boolean
private final int
Row size threshold for multi requests above which a warning is loggedprivate final RegionServerCoprocessorHost
private AsyncClusterConnection
private final Object
private final AtomicLong
private WALEntrySinkFilter
-
Constructor Summary
ConstructorDescriptionReplicationSink
(org.apache.hadoop.conf.Configuration conf, RegionServerCoprocessorHost rsServerHost) Create a sink for replication -
Method Summary
Modifier and TypeMethodDescriptionprivate void
addFamilyAndItsHFilePathToTableInMap
(byte[] family, String pathToHfileFromNS, List<Pair<byte[], List<String>>> familyHFilePathsList) private void
addNewTableEntryInMap
(Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family, String pathToHfileFromNS, String tableName) private <K1,
K2, V> List<V> addToHashMultiMap
(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, V value) Simple helper to a map from key to (a list of) values TODO: Make a general utility methodprivate void
batch
(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) Do the changes and handle the poolprivate void
buildBulkLoadHFileMap
(Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld) private void
decorate the Configuration object to make replication more receptive to delays: lessen the timeout and numTries.private AsyncClusterConnection
private String
getHFilePath
(TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family) Get replication Sink MetricsgetStats()
Get a string representation of this sink's metricsprivate boolean
isNewRowOrType
(ExtendedCell previousCell, ExtendedCell cell) Returns True if we have crossed over onto a new row or typeprivate Put
void
replicateEntries
(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries, ExtendedCellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) Replicate this array of entries directly into the local cluster using the native client.private WALEntrySinkFilter
void
stop the thread pool executor.private UUID
toUUID
(org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID uuid)
-
Field Details
-
LOG
-
conf
-
metrics
-
totalReplicatedEdits
-
hfilesReplicated
-
provider
-
walEntrySinkFilter
-
rowSizeWarnThreshold
Row size threshold for multi requests above which a warning is logged -
replicationSinkTrackerEnabled
-
rsServerHost
-
-
Constructor Details
-
ReplicationSink
public ReplicationSink(org.apache.hadoop.conf.Configuration conf, RegionServerCoprocessorHost rsServerHost) throws IOException Create a sink for replication- Parameters:
conf
- conf object- Throws:
IOException
- thrown when HDFS goes bad or bad file name
-
-
Method Details
-
setupWALEntrySinkFilter
- Throws:
IOException
-
decorateConf
decorate the Configuration object to make replication more receptive to delays: lessen the timeout and numTries. -
replicateEntries
public void replicateEntries(List<org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry> entries, ExtendedCellScanner cells, String replicationClusterId, String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException Replicate this array of entries directly into the local cluster using the native client. Only operates against raw protobuf type saving on a conversion from pb to pojo.- Parameters:
entries
- WAL entries to be replicated.cells
- cell scanner for iteration.replicationClusterId
- Id which will uniquely identify source cluster FS client configurations in the replication configuration directorysourceBaseNamespaceDirPath
- Path that point to the source cluster base namespace directorysourceHFileArchiveDirPath
- Path that point to the source cluster hfile archive directory- Throws:
IOException
- If failed to replicate the data
-
processReplicationMarkerEntry
- Throws:
IOException
-
buildBulkLoadHFileMap
private void buildBulkLoadHFileMap(Map<String, List<Pair<byte[], throws IOExceptionList<String>>>> bulkLoadHFileMap, TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld) - Throws:
IOException
-
addFamilyAndItsHFilePathToTableInMap
-
addNewTableEntryInMap
-
getHFilePath
private String getHFilePath(TableName table, org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor bld, String storeFile, byte[] family) -
isNewRowOrType
Returns True if we have crossed over onto a new row or type -
toUUID
-
addToHashMultiMap
private <K1,K2, List<V> addToHashMultiMapV> (Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2, V value) Simple helper to a map from key to (a list of) values TODO: Make a general utility method- Returns:
- the list of values corresponding to key1 and key2
-
stopReplicationSinkServices
stop the thread pool executor. It is called when the regionserver is stopped. -
batch
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold) throws IOException Do the changes and handle the pool- Parameters:
tableName
- table to insert intoallRows
- list of actionsbatchRowSizeThreshold
- rowSize threshold for batch mutation- Throws:
IOException
-
getConnection
- Throws:
IOException
-
getStats
Get a string representation of this sink's metrics- Returns:
- string with the total replicated edits count and the date of the last edit that was applied
-
getSinkMetrics
Get replication Sink Metrics
-