Class AsyncFSWAL
java.lang.Object
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL<WALProvider.AsyncWriter>
org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL
- All Implemented Interfaces:
Closeable
,AutoCloseable
,WALFileLengthProvider
,WAL
@LimitedPrivate("Configuration")
public class AsyncFSWAL
extends AbstractFSWAL<WALProvider.AsyncWriter>
An asynchronous implementation of FSWAL.
Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
For append, we process it as follow:
- In the caller thread(typically, in the rpc handler thread):
- Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.
- Schedule the consumer task if needed. See
shouldScheduleConsumer()
for more details.
- In the consumer task(executed in a single threaded thread pool)
- Poll the entry from
waitingConsumePayloads
and insert it intotoWriteAppends
- Poll the entry from
toWriteAppends
, append it to the AsyncWriter, and insert it intounackedAppends
- If the buffered size reaches
batchSize
, or there is a sync request, then we call sync on the AsyncWriter. - In the callback methods:
- If succeeded, poll the entry from
unackedAppends
and drop it. - If failed, add all the entries in
unackedAppends
back totoWriteAppends
and wait for writing them again.
- If succeeded, poll the entry from
- Poll the entry from
Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
FSHLog.
For a normal roll request(for example, we have reached the log roll size):
- In the log roller thread, we will set
waitingRoll(int)
to true andreadyForRolling
to false, and then wait onreadyForRolling
(seewaitForSafePoint()
). - In the consumer thread, we will stop polling entries from
waitingConsumePayloads
ifwaitingRoll(int)
is true, and also stop writing the entries intoWriteAppends
out. - If there are unflush data in the writer, sync them.
- When all out-going sync request is finished, i.e, the
unackedAppends
is empty, signal thereadyForRollingCond
. - Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., we reach a safe point. So it is safe to replace old writer with new writer now.
- Set
writerBroken(int)
andwaitingRoll(int)
to false. - Schedule the consumer task.
- Schedule a background task to close the old writer.
-
Nested Class Summary
-
Field Summary
Modifier and TypeFieldDescriptionstatic final String
static final String
private final long
private final Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel>
private final ExecutorService
private final Lock
private final Runnable
private final AtomicBoolean
static final boolean
static final int
static final long
private int
private final org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup
private long
private AsyncFSOutput
private long
private long
private static final org.slf4j.Logger
private static final int
private boolean
private final Condition
private static final Comparator<SyncFuture>
private final StreamSlowMonitor
private final SortedSet<SyncFuture>
private final Deque<FSWALEntry>
private final Deque<FSWALEntry>
private final com.lmax.disruptor.RingBuffer<RingBufferTruck>
private final com.lmax.disruptor.Sequence
private final int
static final String
Fields inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortable, blocksize, closed, closeExecutor, conf, coprocessorHost, DEFAULT_ROLL_ON_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_TIME_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS, filenum, fs, highestSyncedTxid, highestUnsyncedTxid, implClassName, inflightWALClosures, listeners, LOG_NAME_COMPARATOR, logrollsize, MAX_LOGS, maxLogs, numEntries, ourFiles, prefixPathStr, RING_BUFFER_SLOT_COUNT, ROLL_ON_SYNC_TIME_MS, rollOnSyncNs, rollRequested, rollWriterLock, sequenceIdAccounting, shutdown, SLOW_SYNC_ROLL_INTERVAL_MS, SLOW_SYNC_ROLL_THRESHOLD, SLOW_SYNC_TIME_MS, slowSyncCheckInterval, slowSyncCount, slowSyncNs, slowSyncRollThreshold, syncFutureCache, totalLogSize, useHsync, WAL_AVOID_LOCAL_WRITES_DEFAULT, WAL_AVOID_LOCAL_WRITES_KEY, WAL_ROLL_MULTIPLIER, WAL_SHUTDOWN_WAIT_TIMEOUT_MS, WAL_SYNC_TIMEOUT_MS, walArchiveDir, walDir, walFile2Props, walFilePrefix, walFileSuffix, walShutdownTimeout, writer
-
Constructor Summary
ConstructorDescriptionAsyncFSWAL
(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass) AsyncFSWAL
(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) -
Method Summary
Modifier and TypeMethodDescriptionprotected long
append
(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) Append a set of edits to the WAL.private void
private void
closeWriter
(WALProvider.AsyncWriter writer, org.apache.hadoop.fs.Path path) private void
consume()
protected WALProvider.AsyncWriter
createWriterInstance
(org.apache.hadoop.fs.Path path) protected void
doAppend
(WALProvider.AsyncWriter writer, FSWALEntry entry) protected boolean
protected void
doReplaceWriter
(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.AsyncWriter nextWriter) Notice that you need to clear theAbstractFSWAL.rollRequested
flag in this method, as the new writer will begin to work before returning from this method.protected void
protected void
doSync
(boolean forceSync) protected void
doSync
(long txid, boolean forceSync) private static int
epoch
(int epochAndState) private int
private int
finishSyncLowerThanTxid
(long txid) private static long
getLastTxid
(Deque<FSWALEntry> queue) (package private) int
This method gets the datanode replication count for the current WAL.(package private) org.apache.hadoop.hdfs.protocol.DatanodeInfo[]
This method gets the pipeline for the current WAL.private boolean
isHsync
(long beginTxid, long endTxid) private void
markFutureDoneAndOffer
(SyncFuture future, long txid, Throwable t) Helper that marks the future as DONE and offers it back to the cache.private boolean
private void
sync
(WALProvider.AsyncWriter writer) private void
syncCompleted
(long epochWhenSync, WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs) private void
syncFailed
(long epochWhenSync, Throwable error) private boolean
private void
private static boolean
waitingRoll
(int epochAndState) private static boolean
writerBroken
(int epochAndState) Methods inherited from class org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
abortCacheFlush, appendData, appendEntry, appendMarker, archive, archiveLogFile, atHeadOfRingBufferEventHandlerAppend, blockOnSync, checkLogLowReplication, close, completeCacheFlush, computeFilename, doCheckSlowSync, findRegionsToForceFlush, getCoprocessorHost, getCurrentFileName, getEarliestMemStoreSeqNum, getEarliestMemStoreSeqNum, getFilenum, getFileNumFromFileName, getFiles, getInflightWALCloseCount, getLogFileSize, getLogFileSizeIfBeingWritten, getNumLogFiles, getNumRolledLogFiles, getOldPath, getPreallocatedEventCount, getSyncFuture, getUnflushedEntriesCount, getWALArchivePath, getWriter, init, isLogRollRequested, isUnflushedEntries, logRollAndSetupWalProps, main, markClosedAndClean, postSync, registerWALActionsListener, replaceWriter, requestLogRoll, requestLogRoll, rollWriter, rollWriter, shutdown, stampSequenceIdAndPublishToRingBuffer, startCacheFlush, startCacheFlush, sync, sync, sync, sync, toString, unregisterWALActionsListener, updateStore
-
Field Details
-
LOG
-
SEQ_COMPARATOR
-
WAL_BATCH_SIZE
- See Also:
-
DEFAULT_WAL_BATCH_SIZE
- See Also:
-
ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP
- See Also:
-
ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
- See Also:
-
eventLoopGroup
-
consumeExecutor
-
channelClass
-
consumeLock
-
consumer
-
hasConsumerTask
-
MAX_EPOCH
- See Also:
-
epochAndState
-
readyForRolling
-
readyForRollingCond
-
waitingConsumePayloads
-
waitingConsumePayloadsGatingSequence
-
consumerScheduled
-
batchSize
-
fsOut
-
toWriteAppends
-
unackedAppends
-
syncFutures
-
highestProcessedAppendTxid
-
fileLengthAtLastSync
-
highestProcessedAppendTxidAtLastSync
-
waitOnShutdownInSeconds
-
streamSlowMonitor
-
-
Constructor Details
-
AsyncFSWAL
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass) throws FailedLogCloseException, IOException - Throws:
FailedLogCloseException
IOException
-
AsyncFSWAL
public AsyncFSWAL(org.apache.hadoop.fs.FileSystem fs, Abortable abortable, org.apache.hadoop.fs.Path rootDir, String logDir, String archiveDir, org.apache.hadoop.conf.Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup eventLoopGroup, Class<? extends org.apache.hbase.thirdparty.io.netty.channel.Channel> channelClass, StreamSlowMonitor monitor) throws FailedLogCloseException, IOException - Throws:
FailedLogCloseException
IOException
-
-
Method Details
-
markFutureDoneAndOffer
Helper that marks the future as DONE and offers it back to the cache. -
waitingRoll
-
writerBroken
-
epoch
-
trySetReadyForRolling
-
syncFailed
-
syncCompleted
private void syncCompleted(long epochWhenSync, WALProvider.AsyncWriter writer, long processedTxid, long startTimeNs) -
isHsync
-
sync
-
finishSyncLowerThanTxid
-
finishSync
-
getLastTxid
-
appendAndSync
-
consume
-
shouldScheduleConsumer
-
append
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException Description copied from class:AbstractFSWAL
Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must have its region edit/sequence id assigned else it messes up our unification of mvcc and sequenceid. On returnkey
will have the region edit/sequence id filled in. NOTE: This appends, at a time that is usually after this call returns, starts a mvcc transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 'complete' the transaction this mvcc transaction by calling MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it in the finally of a try/finally block within which this appends lives and any subsequent operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the passed in WALKeywalKey
parameter. Be warned that the WriteEntry is not immediately available on return from this method. It WILL be available subsequent to a sync of this append; otherwise, you will just have to wait on the WriteEntry to get filled in.- Specified by:
append
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Parameters:
hri
- the regioninfo associated with appendkey
- Modified by this call; we add to it this edits region edit/sequence id.edits
- Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit sequence id that is after all currently appended edits.inMemstore
- Always true except for case where we are writing a region event meta marker edit, for example, a compaction completion record into the WAL or noting a Region Open event. In these cases the entry is just so we can finish an unfinished compaction after a crash when the new Server reads the WAL on recovery, etc. These transition event 'Markers' do not go via the memstore. When memstore is false, we presume a Marker event edit.- Returns:
- Returns a 'transaction id' and
key
will have the region edit/sequence id in it. - Throws:
IOException
-
doSync
- Specified by:
doSync
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Throws:
IOException
-
doSync
- Specified by:
doSync
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Throws:
IOException
-
createWriterInstance
protected WALProvider.AsyncWriter createWriterInstance(org.apache.hadoop.fs.Path path) throws IOException - Specified by:
createWriterInstance
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Throws:
IOException
-
waitForSafePoint
-
closeWriter
-
doReplaceWriter
protected void doReplaceWriter(org.apache.hadoop.fs.Path oldPath, org.apache.hadoop.fs.Path newPath, WALProvider.AsyncWriter nextWriter) throws IOException Description copied from class:AbstractFSWAL
Notice that you need to clear theAbstractFSWAL.rollRequested
flag in this method, as the new writer will begin to work before returning from this method. If we clear the flag after returning from this call, we may miss a roll request. The implementation class should choose a proper place to clear theAbstractFSWAL.rollRequested
flag, so we do not miss a roll request, typically before you start writing to the new writer.- Specified by:
doReplaceWriter
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Throws:
IOException
-
doShutdown
- Specified by:
doShutdown
in classAbstractFSWAL<WALProvider.AsyncWriter>
- Throws:
IOException
-
doAppend
- Specified by:
doAppend
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
getPipeline
org.apache.hadoop.hdfs.protocol.DatanodeInfo[] getPipeline()Description copied from class:AbstractFSWAL
This method gets the pipeline for the current WAL.- Specified by:
getPipeline
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
getLogReplication
int getLogReplication()Description copied from class:AbstractFSWAL
This method gets the datanode replication count for the current WAL.- Specified by:
getLogReplication
in classAbstractFSWAL<WALProvider.AsyncWriter>
-
doCheckLogLowReplication
- Specified by:
doCheckLogLowReplication
in classAbstractFSWAL<WALProvider.AsyncWriter>
-