001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.lmax.disruptor.RingBuffer;
025import com.lmax.disruptor.Sequence;
026import com.lmax.disruptor.Sequencer;
027import java.io.IOException;
028import java.lang.reflect.Field;
029import java.util.ArrayDeque;
030import java.util.Comparator;
031import java.util.Deque;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Queue;
035import java.util.SortedSet;
036import java.util.TreeSet;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.locks.Condition;
043import java.util.concurrent.locks.Lock;
044import java.util.concurrent.locks.ReentrantLock;
045import java.util.function.Supplier;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.Abortable;
050import org.apache.hadoop.hbase.HBaseInterfaceAudience;
051import org.apache.hadoop.hbase.client.RegionInfo;
052import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
053import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
054import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
055import org.apache.hadoop.hbase.wal.WALEdit;
056import org.apache.hadoop.hbase.wal.WALKeyImpl;
057import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
058import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
065import org.apache.hbase.thirdparty.io.netty.channel.Channel;
066import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
068import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor;
069
070/**
071 * An asynchronous implementation of FSWAL.
072 * <p>
073 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog.
074 * <p>
075 * For append, we process it as follow:
076 * <ol>
077 * <li>In the caller thread(typically, in the rpc handler thread):
078 * <ol>
079 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li>
080 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details.
081 * </li>
082 * </ol>
083 * </li>
084 * <li>In the consumer task(executed in a single threaded thread pool)
085 * <ol>
086 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
087 * {@link #toWriteAppends}</li>
088 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into
089 * {@link #unackedAppends}</li>
090 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call
091 * sync on the AsyncWriter.</li>
092 * <li>In the callback methods:
093 * <ul>
094 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li>
095 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and
096 * wait for writing them again.</li>
097 * </ul>
098 * </li>
099 * </ol>
100 * </li>
101 * </ol>
102 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new
103 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error.
104 * <p>
105 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with
106 * FSHLog.<br>
107 * For a normal roll request(for example, we have reached the log roll size):
108 * <ol>
109 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and
110 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see
111 * {@link #waitForSafePoint()}).</li>
112 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if
113 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out.
114 * </li>
115 * <li>If there are unflush data in the writer, sync them.</li>
116 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty,
117 * signal the {@link #readyForRollingCond}.</li>
118 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e.,
119 * we reach a safe point. So it is safe to replace old writer with new writer now.</li>
120 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
121 * <li>Schedule the consumer task.</li>
122 * <li>Schedule a background task to close the old writer.</li>
123 * </ol>
124 * For a broken writer roll request, the only difference is that we can bypass the wait for safe
125 * point stage.
126 */
127@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
128public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
129
130  private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class);
131
132  private static final Comparator<SyncFuture> SEQ_COMPARATOR =
133    Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
134
135  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
136  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
137
138  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
139    "hbase.wal.async.use-shared-event-loop";
140  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
141
142  public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS =
143    "hbase.wal.async.wait.on.shutdown.seconds";
144  public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
145
146  private final EventLoopGroup eventLoopGroup;
147
148  private final ExecutorService consumeExecutor;
149
150  private final Class<? extends Channel> channelClass;
151
152  private final Lock consumeLock = new ReentrantLock();
153
154  private final Runnable consumer = this::consume;
155
156  // check if there is already a consumer task in the event loop's task queue
157  private final Supplier<Boolean> hasConsumerTask;
158
159  private static final int MAX_EPOCH = 0x3FFFFFFF;
160  // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old
161  // writer to be closed.
162  // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter
163  // is needed.
164  // all other bits are the epoch number of the current writer, this is used to detect whether the
165  // writer is still the one when you issue the sync.
166  // notice that, modification to this field is only allowed under the protection of consumeLock.
167  private volatile int epochAndState;
168
169  private boolean readyForRolling;
170
171  private final Condition readyForRollingCond = consumeLock.newCondition();
172
173  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
174
175  private final Sequence waitingConsumePayloadsGatingSequence;
176
177  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
178
179  private final long batchSize;
180
181  private volatile AsyncFSOutput fsOut;
182
183  private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
184
185  private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
186
187  private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
188
189  // the highest txid of WAL entries being processed
190  private long highestProcessedAppendTxid;
191
192  // file length when we issue last sync request on the writer
193  private long fileLengthAtLastSync;
194
195  private long highestProcessedAppendTxidAtLastSync;
196
197  private final int waitOnShutdownInSeconds;
198
199  private final StreamSlowMonitor streamSlowMonitor;
200
201  public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
202    Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix,
203    String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass)
204    throws FailedLogCloseException, IOException {
205    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
206      eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix"));
207  }
208
209  public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
210    String archiveDir, Configuration conf, List<WALActionsListener> listeners,
211    boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
212    Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
213    throws FailedLogCloseException, IOException {
214    super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
215      suffix);
216    this.eventLoopGroup = eventLoopGroup;
217    this.channelClass = channelClass;
218    this.streamSlowMonitor = monitor;
219    Supplier<Boolean> hasConsumerTask;
220    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
221      this.consumeExecutor = eventLoopGroup.next();
222      if (consumeExecutor instanceof SingleThreadEventExecutor) {
223        try {
224          Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
225          field.setAccessible(true);
226          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
227          hasConsumerTask = () -> queue.peek() == consumer;
228        } catch (Exception e) {
229          LOG.warn("Can not get task queue of " + consumeExecutor
230            + ", this is not necessary, just give up", e);
231          hasConsumerTask = () -> false;
232        }
233      } else {
234        hasConsumerTask = () -> false;
235      }
236    } else {
237      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
238        new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder()
239          .setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()).setDaemon(true).build());
240      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
241      this.consumeExecutor = threadPool;
242    }
243
244    this.hasConsumerTask = hasConsumerTask;
245    int preallocatedEventCount =
246      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
247    waitingConsumePayloads =
248      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
249    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
250    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
251
252    // inrease the ringbuffer sequence so our txid is start from 1
253    waitingConsumePayloads.publish(waitingConsumePayloads.next());
254    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
255
256    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
257    waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
258      DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
259  }
260
261  /**
262   * Helper that marks the future as DONE and offers it back to the cache.
263   */
264  private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
265    future.done(txid, t);
266    syncFutureCache.offer(future);
267  }
268
269  private static boolean waitingRoll(int epochAndState) {
270    return (epochAndState & 1) != 0;
271  }
272
273  private static boolean writerBroken(int epochAndState) {
274    return ((epochAndState >>> 1) & 1) != 0;
275  }
276
277  private static int epoch(int epochAndState) {
278    return epochAndState >>> 2;
279  }
280
281  // return whether we have successfully set readyForRolling to true.
282  private boolean trySetReadyForRolling() {
283    // Check without holding lock first. Usually we will just return here.
284    // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe
285    // to check them outside the consumeLock.
286    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
287      return false;
288    }
289    consumeLock.lock();
290    try {
291      // 1. a roll is requested
292      // 2. all out-going entries have been acked(we have confirmed above).
293      if (waitingRoll(epochAndState)) {
294        readyForRolling = true;
295        readyForRollingCond.signalAll();
296        return true;
297      } else {
298        return false;
299      }
300    } finally {
301      consumeLock.unlock();
302    }
303  }
304
305  private void syncFailed(long epochWhenSync, Throwable error) {
306    LOG.warn("sync failed", error);
307    boolean shouldRequestLogRoll = true;
308    consumeLock.lock();
309    try {
310      int currentEpochAndState = epochAndState;
311      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
312        // this is not the previous writer which means we have already rolled the writer.
313        // or this is still the current writer, but we have already marked it as broken and request
314        // a roll.
315        return;
316      }
317      this.epochAndState = currentEpochAndState | 0b10;
318      if (waitingRoll(currentEpochAndState)) {
319        readyForRolling = true;
320        readyForRollingCond.signalAll();
321        // this means we have already in the middle of a rollWriter so just tell the roller thread
322        // that you can continue without requesting an extra log roll.
323        shouldRequestLogRoll = false;
324      }
325    } finally {
326      consumeLock.unlock();
327    }
328    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
329      toWriteAppends.addFirst(iter.next());
330    }
331    highestUnsyncedTxid = highestSyncedTxid.get();
332    if (shouldRequestLogRoll) {
333      // request a roll.
334      requestLogRoll(ERROR);
335    }
336  }
337
338  private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid,
339    long startTimeNs) {
340    // Please see the last several comments on HBASE-22761, it is possible that we get a
341    // syncCompleted which acks a previous sync request after we received a syncFailed on the same
342    // writer. So here we will also check on the epoch and state, if the epoch has already been
343    // changed, i.e, we have already rolled the writer, or the writer is already broken, we should
344    // just skip here, to avoid mess up the state or accidentally release some WAL entries and
345    // cause data corruption.
346    // The syncCompleted call is on the critical write path, so we should try our best to make it
347    // fast. So here we do not hold consumeLock, for increasing performance. It is safe because
348    // there are only 3 possible situations:
349    // 1. For normal case, the only place where we change epochAndState is when rolling the writer.
350    // Before rolling actually happen, we will only change the state to waitingRoll which is another
351    // bit than writerBroken, and when we actually change the epoch, we can make sure that there is
352    // no outgoing sync request. So we will always pass the check here and there is no problem.
353    // 2. The writer is broken, but we have not called syncFailed yet. In this case, since
354    // syncFailed and syncCompleted are executed in the same thread, we will just face the same
355    // situation with #1.
356    // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
357    // only 2 possible situations:
358    // a. we arrive before we actually roll the writer, then we will find out the writer is broken
359    // and give up.
360    // b. we arrive after we actually roll the writer, then we will find out the epoch is changed
361    // and give up.
362    // For both #a and #b, we do not need to hold the consumeLock as we will always update the
363    // epochAndState as a whole.
364    // So in general, for all the cases above, we do not need to hold the consumeLock.
365    int epochAndState = this.epochAndState;
366    if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
367      LOG.warn("Got a sync complete call after the writer is broken, skip");
368      return;
369    }
370    highestSyncedTxid.set(processedTxid);
371    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
372      FSWALEntry entry = iter.next();
373      if (entry.getTxid() <= processedTxid) {
374        entry.release();
375        iter.remove();
376      } else {
377        break;
378      }
379    }
380    postSync(System.nanoTime() - startTimeNs, finishSync());
381    if (trySetReadyForRolling()) {
382      // we have just finished a roll, then do not need to check for log rolling, the writer will be
383      // closed soon.
384      return;
385    }
386    // If we haven't already requested a roll, check if we have exceeded logrollsize
387    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
388      if (LOG.isDebugEnabled()) {
389        LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()
390          + ", logrollsize=" + logrollsize);
391      }
392      requestLogRoll(SIZE);
393    }
394  }
395
396  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
397  // sync futures then just use the default one.
398  private boolean isHsync(long beginTxid, long endTxid) {
399    SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
400      new SyncFuture().reset(endTxid + 1, false));
401    if (futures.isEmpty()) {
402      return useHsync;
403    }
404    for (SyncFuture future : futures) {
405      if (future.isForceSync()) {
406        return true;
407      }
408    }
409    return false;
410  }
411
412  private void sync(AsyncWriter writer) {
413    fileLengthAtLastSync = writer.getLength();
414    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
415    boolean shouldUseHsync =
416      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
417    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
418    final long startTimeNs = System.nanoTime();
419    final long epoch = (long) epochAndState >>> 2L;
420    addListener(writer.sync(shouldUseHsync), (result, error) -> {
421      if (error != null) {
422        syncFailed(epoch, error);
423      } else {
424        syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs);
425      }
426    }, consumeExecutor);
427  }
428
429  private int finishSyncLowerThanTxid(long txid) {
430    int finished = 0;
431    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
432      SyncFuture sync = iter.next();
433      if (sync.getTxid() <= txid) {
434        markFutureDoneAndOffer(sync, txid, null);
435        iter.remove();
436        finished++;
437      } else {
438        break;
439      }
440    }
441    return finished;
442  }
443
444  // try advancing the highestSyncedTxid as much as possible
445  private int finishSync() {
446    if (unackedAppends.isEmpty()) {
447      // All outstanding appends have been acked.
448      if (toWriteAppends.isEmpty()) {
449        // Also no appends that wait to be written out, then just finished all pending syncs.
450        long maxSyncTxid = highestSyncedTxid.get();
451        for (SyncFuture sync : syncFutures) {
452          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
453          sync.done(maxSyncTxid, null);
454        }
455        highestSyncedTxid.set(maxSyncTxid);
456        int finished = syncFutures.size();
457        syncFutures.clear();
458        return finished;
459      } else {
460        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
461        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
462        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
463        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
464        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
465        long doneTxid = lowestUnprocessedAppendTxid - 1;
466        highestSyncedTxid.set(doneTxid);
467        return finishSyncLowerThanTxid(doneTxid);
468      }
469    } else {
470      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
471      // first unacked append minus 1.
472      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
473      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
474      highestSyncedTxid.set(doneTxid);
475      return finishSyncLowerThanTxid(doneTxid);
476    }
477  }
478
479  // confirm non-empty before calling
480  private static long getLastTxid(Deque<FSWALEntry> queue) {
481    return queue.peekLast().getTxid();
482  }
483
484  private void appendAndSync() {
485    final AsyncWriter writer = this.writer;
486    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
487    // finish some.
488    finishSync();
489    long newHighestProcessedAppendTxid = -1L;
490    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
491    // threaded, this could save us some cycles
492    boolean addedToUnackedAppends = false;
493    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
494      FSWALEntry entry = iter.next();
495      boolean appended;
496      try {
497        appended = appendEntry(writer, entry);
498      } catch (IOException e) {
499        throw new AssertionError("should not happen", e);
500      }
501      newHighestProcessedAppendTxid = entry.getTxid();
502      iter.remove();
503      if (appended) {
504        // This is possible, when we fail to sync, we will add the unackedAppends back to
505        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
506        if (
507          addedToUnackedAppends || unackedAppends.isEmpty()
508            || getLastTxid(unackedAppends) < entry.getTxid()
509        ) {
510          unackedAppends.addLast(entry);
511          addedToUnackedAppends = true;
512        }
513        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
514        // unackedAppends out. As the code in the consume method will assume that, the entries in
515        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
516        // not empty, we could just return as later there will be a syncCompleted call to clear the
517        // unackedAppends, or a syncFailed to lead us to another state.
518        // There could be other ways to fix, such as changing the logic in the consume method, but
519        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
520        // this way to fix first, can optimize later.
521        if (
522          writer.getLength() - fileLengthAtLastSync >= batchSize
523            && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))
524        ) {
525          break;
526        }
527      }
528    }
529    // if we have a newer transaction id, update it.
530    // otherwise, use the previous transaction id.
531    if (newHighestProcessedAppendTxid > 0) {
532      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
533    } else {
534      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
535    }
536
537    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
538      // sync because buffer size limit.
539      sync(writer);
540      return;
541    }
542    if (writer.getLength() == fileLengthAtLastSync) {
543      // we haven't written anything out, just advance the highestSyncedSequence since we may only
544      // stamp some region sequence id.
545      if (unackedAppends.isEmpty()) {
546        highestSyncedTxid.set(highestProcessedAppendTxid);
547        finishSync();
548        trySetReadyForRolling();
549      }
550      return;
551    }
552    // reach here means that we have some unsynced data but haven't reached the batch size yet,
553    // but we will not issue a sync directly here even if there are sync requests because we may
554    // have some new data in the ringbuffer, so let's just return here and delay the decision of
555    // whether to issue a sync in the caller method.
556  }
557
558  private void consume() {
559    consumeLock.lock();
560    try {
561      int currentEpochAndState = epochAndState;
562      if (writerBroken(currentEpochAndState)) {
563        return;
564      }
565      if (waitingRoll(currentEpochAndState)) {
566        if (writer.getLength() > fileLengthAtLastSync) {
567          // issue a sync
568          sync(writer);
569        } else {
570          if (unackedAppends.isEmpty()) {
571            readyForRolling = true;
572            readyForRollingCond.signalAll();
573          }
574        }
575        return;
576      }
577    } finally {
578      consumeLock.unlock();
579    }
580    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
581    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
582        <= cursorBound; nextCursor++) {
583      if (!waitingConsumePayloads.isPublished(nextCursor)) {
584        break;
585      }
586      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
587      switch (truck.type()) {
588        case APPEND:
589          toWriteAppends.addLast(truck.unloadAppend());
590          break;
591        case SYNC:
592          syncFutures.add(truck.unloadSync());
593          break;
594        default:
595          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
596          break;
597      }
598      waitingConsumePayloadsGatingSequence.set(nextCursor);
599    }
600    appendAndSync();
601    if (hasConsumerTask.get()) {
602      return;
603    }
604    if (toWriteAppends.isEmpty()) {
605      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
606        consumerScheduled.set(false);
607        // recheck here since in append and sync we do not hold the consumeLock. Thing may
608        // happen like
609        // 1. we check cursor, no new entry
610        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
611        // give up scheduling the consumer task.
612        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
613        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
614          // we will give up consuming so if there are some unsynced data we need to issue a sync.
615          if (
616            writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
617              && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync
618          ) {
619            // no new data in the ringbuffer and we have at least one sync request
620            sync(writer);
621          }
622          return;
623        } else {
624          // maybe someone has grabbed this before us
625          if (!consumerScheduled.compareAndSet(false, true)) {
626            return;
627          }
628        }
629      }
630    }
631    // reschedule if we still have something to write.
632    consumeExecutor.execute(consumer);
633  }
634
635  private boolean shouldScheduleConsumer() {
636    int currentEpochAndState = epochAndState;
637    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
638      return false;
639    }
640    return consumerScheduled.compareAndSet(false, true);
641  }
642
643  @Override
644  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
645    throws IOException {
646    long txid =
647      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
648    if (shouldScheduleConsumer()) {
649      consumeExecutor.execute(consumer);
650    }
651    return txid;
652  }
653
654  @Override
655  protected void doSync(boolean forceSync) throws IOException {
656    long txid = waitingConsumePayloads.next();
657    SyncFuture future;
658    try {
659      future = getSyncFuture(txid, forceSync);
660      RingBufferTruck truck = waitingConsumePayloads.get(txid);
661      truck.load(future);
662    } finally {
663      waitingConsumePayloads.publish(txid);
664    }
665    if (shouldScheduleConsumer()) {
666      consumeExecutor.execute(consumer);
667    }
668    blockOnSync(future);
669  }
670
671  @Override
672  protected void doSync(long txid, boolean forceSync) throws IOException {
673    if (highestSyncedTxid.get() >= txid) {
674      return;
675    }
676    // here we do not use ring buffer sequence as txid
677    long sequence = waitingConsumePayloads.next();
678    SyncFuture future;
679    try {
680      future = getSyncFuture(txid, forceSync);
681      RingBufferTruck truck = waitingConsumePayloads.get(sequence);
682      truck.load(future);
683    } finally {
684      waitingConsumePayloads.publish(sequence);
685    }
686    if (shouldScheduleConsumer()) {
687      consumeExecutor.execute(consumer);
688    }
689    blockOnSync(future);
690  }
691
692  @Override
693  protected AsyncWriter createWriterInstance(Path path) throws IOException {
694    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
695      eventLoopGroup, channelClass, streamSlowMonitor);
696  }
697
698  private void waitForSafePoint() {
699    consumeLock.lock();
700    try {
701      int currentEpochAndState = epochAndState;
702      if (writerBroken(currentEpochAndState) || this.writer == null) {
703        return;
704      }
705      consumerScheduled.set(true);
706      epochAndState = currentEpochAndState | 1;
707      readyForRolling = false;
708      consumeExecutor.execute(consumer);
709      while (!readyForRolling) {
710        readyForRollingCond.awaitUninterruptibly();
711      }
712    } finally {
713      consumeLock.unlock();
714    }
715  }
716
717  private void closeWriter(AsyncWriter writer, Path path) {
718    inflightWALClosures.put(path.getName(), writer);
719    closeExecutor.execute(() -> {
720      try {
721        writer.close();
722      } catch (IOException e) {
723        LOG.warn("close old writer failed", e);
724      } finally {
725        // call this even if the above close fails, as there is no other chance we can set closed to
726        // true, it will not cause big problems.
727        markClosedAndClean(path);
728        inflightWALClosures.remove(path.getName());
729      }
730    });
731  }
732
733  @Override
734  protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
735    throws IOException {
736    Preconditions.checkNotNull(nextWriter);
737    waitForSafePoint();
738    // we will call rollWriter in init method, where we want to create the first writer and
739    // obviously the previous writer is null, so here we need this null check. And why we must call
740    // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
741    // closing the writer asynchronously, we need to make sure the WALProps is put into
742    // walFile2Props before we call markClosedAndClean
743    if (writer != null) {
744      long oldFileLen = writer.getLength();
745      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
746      closeWriter(writer, oldPath);
747    } else {
748      logRollAndSetupWalProps(oldPath, newPath, 0);
749    }
750
751    this.writer = nextWriter;
752    if (nextWriter instanceof AsyncProtobufLogWriter) {
753      this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
754    }
755    this.fileLengthAtLastSync = nextWriter.getLength();
756    this.highestProcessedAppendTxidAtLastSync = 0L;
757    consumeLock.lock();
758    try {
759      consumerScheduled.set(true);
760      int currentEpoch = epochAndState >>> 2;
761      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
762      // set a new epoch and also clear waitingRoll and writerBroken
763      this.epochAndState = nextEpoch << 2;
764      // Reset rollRequested status
765      rollRequested.set(false);
766      consumeExecutor.execute(consumer);
767    } finally {
768      consumeLock.unlock();
769    }
770  }
771
772  @Override
773  protected void doShutdown() throws IOException {
774    waitForSafePoint();
775    closeWriter(this.writer, getOldPath());
776    this.writer = null;
777    closeExecutor.shutdown();
778    try {
779      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
780        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
781          + " the close of async writer doesn't complete."
782          + "Please check the status of underlying filesystem"
783          + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS
784          + "\"");
785      }
786    } catch (InterruptedException e) {
787      LOG.error("The wait for close of async writer is interrupted");
788      Thread.currentThread().interrupt();
789    }
790    IOException error = new IOException("WAL has been closed");
791    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
792    // drain all the pending sync requests
793    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
794        <= cursorBound; nextCursor++) {
795      if (!waitingConsumePayloads.isPublished(nextCursor)) {
796        break;
797      }
798      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
799      switch (truck.type()) {
800        case SYNC:
801          syncFutures.add(truck.unloadSync());
802          break;
803        default:
804          break;
805      }
806    }
807    // and fail them
808    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
809    if (!(consumeExecutor instanceof EventLoop)) {
810      consumeExecutor.shutdown();
811    }
812  }
813
814  @Override
815  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
816    writer.append(entry);
817  }
818
819  @Override
820  DatanodeInfo[] getPipeline() {
821    AsyncFSOutput output = this.fsOut;
822    return output != null ? output.getPipeline() : new DatanodeInfo[0];
823  }
824
825  @Override
826  int getLogReplication() {
827    return getPipeline().length;
828  }
829
830  @Override
831  protected boolean doCheckLogLowReplication() {
832    // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
833    // typically there is no 'low replication' state, only a 'broken' state.
834    AsyncFSOutput output = this.fsOut;
835    return output != null && output.isBroken();
836  }
837}