001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;
023import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
024import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;
025import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
026import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
027import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
028import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
029
030import com.google.errorprone.annotations.RestrictedApi;
031import com.lmax.disruptor.RingBuffer;
032import com.lmax.disruptor.Sequence;
033import com.lmax.disruptor.Sequencer;
034import io.opentelemetry.api.trace.Span;
035import java.io.FileNotFoundException;
036import java.io.IOException;
037import java.io.InterruptedIOException;
038import java.lang.management.MemoryType;
039import java.net.URLEncoder;
040import java.nio.charset.StandardCharsets;
041import java.util.ArrayDeque;
042import java.util.ArrayList;
043import java.util.Arrays;
044import java.util.Comparator;
045import java.util.Deque;
046import java.util.Iterator;
047import java.util.List;
048import java.util.Map;
049import java.util.OptionalLong;
050import java.util.Set;
051import java.util.SortedSet;
052import java.util.TreeSet;
053import java.util.concurrent.Callable;
054import java.util.concurrent.CompletableFuture;
055import java.util.concurrent.ConcurrentHashMap;
056import java.util.concurrent.ConcurrentNavigableMap;
057import java.util.concurrent.ConcurrentSkipListMap;
058import java.util.concurrent.CopyOnWriteArrayList;
059import java.util.concurrent.ExecutionException;
060import java.util.concurrent.ExecutorService;
061import java.util.concurrent.Executors;
062import java.util.concurrent.Future;
063import java.util.concurrent.LinkedBlockingQueue;
064import java.util.concurrent.ThreadPoolExecutor;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.TimeoutException;
067import java.util.concurrent.atomic.AtomicBoolean;
068import java.util.concurrent.atomic.AtomicInteger;
069import java.util.concurrent.atomic.AtomicLong;
070import java.util.concurrent.locks.Condition;
071import java.util.concurrent.locks.Lock;
072import java.util.concurrent.locks.ReentrantLock;
073import java.util.function.Supplier;
074import java.util.stream.Collectors;
075import org.apache.commons.lang3.mutable.MutableLong;
076import org.apache.hadoop.conf.Configuration;
077import org.apache.hadoop.fs.FileStatus;
078import org.apache.hadoop.fs.FileSystem;
079import org.apache.hadoop.fs.Path;
080import org.apache.hadoop.fs.PathFilter;
081import org.apache.hadoop.hbase.Abortable;
082import org.apache.hadoop.hbase.Cell;
083import org.apache.hadoop.hbase.HBaseConfiguration;
084import org.apache.hadoop.hbase.HConstants;
085import org.apache.hadoop.hbase.PrivateCellUtil;
086import org.apache.hadoop.hbase.client.ConnectionUtils;
087import org.apache.hadoop.hbase.client.RegionInfo;
088import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
089import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
090import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
091import org.apache.hadoop.hbase.ipc.RpcServer;
092import org.apache.hadoop.hbase.ipc.ServerCall;
093import org.apache.hadoop.hbase.log.HBaseMarkers;
094import org.apache.hadoop.hbase.regionserver.HRegion;
095import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
096import org.apache.hadoop.hbase.trace.TraceUtil;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.CommonFSUtils;
099import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
100import org.apache.hadoop.hbase.util.Pair;
101import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
102import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
103import org.apache.hadoop.hbase.wal.WAL;
104import org.apache.hadoop.hbase.wal.WALEdit;
105import org.apache.hadoop.hbase.wal.WALFactory;
106import org.apache.hadoop.hbase.wal.WALKeyImpl;
107import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
108import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
109import org.apache.hadoop.hbase.wal.WALSplitter;
110import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
111import org.apache.hadoop.util.StringUtils;
112import org.apache.yetus.audience.InterfaceAudience;
113import org.slf4j.Logger;
114import org.slf4j.LoggerFactory;
115
116import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
117import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
118import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
119
120/**
121 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
122 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
123 * This is done internal to the implementation.
124 * <p>
125 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
126 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
127 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
128 * flushed out to hfiles, and what is yet in WAL and in memory only.
129 * <p>
130 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
131 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
132 * (smaller) than the most-recent flush.
133 * <p>
134 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
135 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
136 * replication where we may want to tail the active WAL file.
137 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
138 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
139 * we have made successful appends to the WAL and we then are unable to sync them, our current
140 * semantic is to return error to the client that the appends failed but also to abort the current
141 * context, usually the hosting server. We need to replay the WALs. <br>
142 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
143 * that the append failed. <br>
144 * TODO: replication may pick up these last edits though they have been marked as failed append
145 * (Need to keep our own file lengths, not rely on HDFS).
146 */
147@InterfaceAudience.Private
148public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
149  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
150
151  private static final Comparator<SyncFuture> SEQ_COMPARATOR =
152    Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);
153
154  private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";
155  private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;
156  /** Don't log blocking regions more frequently than this. */
157  private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
158
159  protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";
160  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
161  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
162  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
163  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
164    "hbase.regionserver.wal.slowsync.roll.threshold";
165  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
166  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
167    "hbase.regionserver.wal.slowsync.roll.interval.ms";
168  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
169
170  public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
171  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
172
173  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
174
175  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
176
177  public static final String RING_BUFFER_SLOT_COUNT =
178    "hbase.regionserver.wal.disruptor.event.count";
179
180  public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
181  public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
182
183  public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
184  public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
185
186  public static final String WAL_AVOID_LOCAL_WRITES_KEY =
187    "hbase.regionserver.wal.avoid-local-writes";
188  public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
189
190  /**
191   * file system instance
192   */
193  protected final FileSystem fs;
194
195  /**
196   * WAL directory, where all WAL files would be placed.
197   */
198  protected final Path walDir;
199
200  private final FileSystem remoteFs;
201
202  private final Path remoteWALDir;
203
204  /**
205   * dir path where old logs are kept.
206   */
207  protected final Path walArchiveDir;
208
209  /**
210   * Matches just those wal files that belong to this wal instance.
211   */
212  protected final PathFilter ourFiles;
213
214  /**
215   * Prefix of a WAL file, usually the region server name it is hosted on.
216   */
217  protected final String walFilePrefix;
218
219  /**
220   * Suffix included on generated wal file names
221   */
222  protected final String walFileSuffix;
223
224  /**
225   * Prefix used when checking for wal membership.
226   */
227  protected final String prefixPathStr;
228
229  protected final WALCoprocessorHost coprocessorHost;
230
231  /**
232   * conf object
233   */
234  protected final Configuration conf;
235
236  protected final Abortable abortable;
237
238  /** Listeners that are called on WAL events. */
239  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
240
241  /** Tracks the logs in the process of being closed. */
242  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
243
244  /**
245   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
246   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
247   * facility for answering questions such as "Is it safe to GC a WAL?".
248   */
249  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
250
251  /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */
252  protected final long slowSyncNs, rollOnSyncNs;
253  protected final int slowSyncRollThreshold;
254  protected final int slowSyncCheckInterval;
255  protected final AtomicInteger slowSyncCount = new AtomicInteger();
256
257  private final long walSyncTimeoutNs;
258
259  private final long walTooOldNs;
260
261  // If > than this size, roll the log.
262  protected final long logrollsize;
263
264  /**
265   * Block size to use writing files.
266   */
267  protected final long blocksize;
268
269  /*
270   * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If
271   * too many and we crash, then will take forever replaying. Keep the number of logs tidy.
272   */
273  protected final int maxLogs;
274
275  protected final boolean useHsync;
276
277  /**
278   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
279   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
280   * warning when it thinks synchronized controls writer thread safety. It is held when we are
281   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
282   * not.
283   */
284  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
285
286  // The timestamp (in ms) when the log file was created.
287  protected final AtomicLong filenum = new AtomicLong(-1);
288
289  // Number of transactions in the current Wal.
290  protected final AtomicInteger numEntries = new AtomicInteger(0);
291
292  /**
293   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
294   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
295   * corresponding entry in queue.
296   */
297  protected volatile long highestUnsyncedTxid = -1;
298
299  /**
300   * Updated to the transaction id of the last successful sync call. This can be less than
301   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
302   * for it.
303   */
304  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
305
306  /**
307   * The total size of wal
308   */
309  protected final AtomicLong totalLogSize = new AtomicLong(0);
310  /**
311   * Current log file.
312   */
313  volatile W writer;
314
315  // Last time to check low replication on hlog's pipeline
316  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
317
318  // Last time we asked to roll the log due to a slow sync
319  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
320
321  protected volatile boolean closed = false;
322
323  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
324
325  protected final long walShutdownTimeout;
326
327  private long nextLogTooOldNs = System.nanoTime();
328
329  /**
330   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
331   * an IllegalArgumentException if used to compare paths from different wals.
332   */
333  final Comparator<Path> LOG_NAME_COMPARATOR =
334    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
335
336  private static final class WALProps {
337
338    /**
339     * Map the encoded region name to the highest sequence id.
340     * <p/>
341     * Contains all the regions it has an entry for.
342     */
343    private final Map<byte[], Long> encodedName2HighestSequenceId;
344
345    /**
346     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
347     * subclasses.
348     */
349    private final long logSize;
350
351    /**
352     * The nanoTime of the log rolling, used to determine the time interval that has passed since.
353     */
354    private final long rollTimeNs;
355
356    /**
357     * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the
358     * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
359     * for safety.
360     */
361    private volatile boolean closed = false;
362
363    WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
364      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
365      this.logSize = logSize;
366      this.rollTimeNs = System.nanoTime();
367    }
368  }
369
370  /**
371   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
372   * (contained in the log file name).
373   */
374  protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
375    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
376
377  /**
378   * A cache of sync futures reused by threads.
379   */
380  protected final SyncFutureCache syncFutureCache;
381
382  /**
383   * The class name of the runtime implementation, used as prefix for logging/tracing.
384   * <p>
385   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
386   * refer to HBASE-17676 for more details
387   * </p>
388   */
389  protected final String implClassName;
390
391  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
392
393  protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
394    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
395
396  // Run in caller if we get reject execution exception, to avoid aborting region server when we get
397  // reject execution exception. Usually this should not happen but let's make it more robust.
398  private final ExecutorService logArchiveExecutor =
399    new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
400      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
401      new ThreadPoolExecutor.CallerRunsPolicy());
402
403  private final int archiveRetries;
404
405  protected ExecutorService consumeExecutor;
406
407  private final Lock consumeLock = new ReentrantLock();
408
409  protected final Runnable consumer = this::consume;
410
411  // check if there is already a consumer task in the event loop's task queue
412  protected Supplier<Boolean> hasConsumerTask;
413
414  private static final int MAX_EPOCH = 0x3FFFFFFF;
415  // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old
416  // writer to be closed.
417  // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter
418  // is needed.
419  // all other bits are the epoch number of the current writer, this is used to detect whether the
420  // writer is still the one when you issue the sync.
421  // notice that, modification to this field is only allowed under the protection of consumeLock.
422  private volatile int epochAndState;
423
424  private boolean readyForRolling;
425
426  private final Condition readyForRollingCond = consumeLock.newCondition();
427
428  private final RingBuffer<RingBufferTruck> waitingConsumePayloads;
429
430  private final Sequence waitingConsumePayloadsGatingSequence;
431
432  private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
433
434  private final long batchSize;
435
436  protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
437
438  protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();
439
440  protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);
441
442  // the highest txid of WAL entries being processed
443  protected long highestProcessedAppendTxid;
444
445  // file length when we issue last sync request on the writer
446  private long fileLengthAtLastSync;
447
448  private long highestProcessedAppendTxidAtLastSync;
449
450  private int waitOnShutdownInSeconds;
451
452  private String waitOnShutdownInSecondsConfigKey;
453
454  protected boolean shouldShutDownConsumeExecutorWhenClose = true;
455
456  private volatile boolean skipRemoteWAL = false;
457
458  private volatile boolean markerEditOnly = false;
459
460  public long getFilenum() {
461    return this.filenum.get();
462  }
463
464  /**
465   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
466   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
467   * the filename is created with the {@link #computeFilename(long filenum)} method.
468   * @return timestamp, as in the log file name.
469   */
470  protected long getFileNumFromFileName(Path fileName) {
471    checkNotNull(fileName, "file name can't be null");
472    if (!ourFiles.accept(fileName)) {
473      throw new IllegalArgumentException(
474        "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
475    }
476    final String fileNameString = fileName.toString();
477    String chompedPath = fileNameString.substring(prefixPathStr.length(),
478      (fileNameString.length() - walFileSuffix.length()));
479    return Long.parseLong(chompedPath);
480  }
481
482  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
483    checkArgument(logRollSize > 0,
484      "The log roll size cannot be zero or negative when calculating max log files, "
485        + "current value is " + logRollSize);
486    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
487    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
488  }
489
490  // must be power of 2
491  protected final int getPreallocatedEventCount() {
492    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
493    // be stuck and make no progress if the buffer is filled with appends only and there is no
494    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
495    // before they return.
496    int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
497    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
498    int floor = Integer.highestOneBit(preallocatedEventCount);
499    if (floor == preallocatedEventCount) {
500      return floor;
501    }
502    // max capacity is 1 << 30
503    if (floor >= 1 << 29) {
504      return 1 << 30;
505    }
506    return floor << 1;
507  }
508
509  protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds,
510    String waitOnShutdownInSecondsConfigKey) {
511    this.waitOnShutdownInSeconds = waitOnShutdownInSeconds;
512    this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey;
513  }
514
515  protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir,
516    final String prefix) {
517    ThreadPoolExecutor threadPool =
518      new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
519        new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:"
520          + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());
521    hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
522    consumeExecutor = threadPool;
523    this.shouldShutDownConsumeExecutorWhenClose = true;
524  }
525
526  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
527    final String logDir, final String archiveDir, final Configuration conf,
528    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
529    final String suffix, FileSystem remoteFs, Path remoteWALDir)
530    throws FailedLogCloseException, IOException {
531    this.fs = fs;
532    this.walDir = new Path(rootDir, logDir);
533    this.walArchiveDir = new Path(rootDir, archiveDir);
534    this.conf = conf;
535    this.abortable = abortable;
536    this.remoteFs = remoteFs;
537    this.remoteWALDir = remoteWALDir;
538
539    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
540      throw new IOException("Unable to mkdir " + walDir);
541    }
542
543    if (!fs.exists(this.walArchiveDir)) {
544      if (!fs.mkdirs(this.walArchiveDir)) {
545        throw new IOException("Unable to mkdir " + this.walArchiveDir);
546      }
547    }
548
549    // If prefix is null||empty then just name it wal
550    this.walFilePrefix = prefix == null || prefix.isEmpty()
551      ? "wal"
552      : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name());
553    // we only correctly differentiate suffices when numeric ones start with '.'
554    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
555      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
556        + "' but instead was '" + suffix + "'");
557    }
558    // Now that it exists, set the storage policy for the entire directory of wal files related to
559    // this FSHLog instance
560    String storagePolicy =
561      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
562    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
563    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
564    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
565
566    this.ourFiles = new PathFilter() {
567      @Override
568      public boolean accept(final Path fileName) {
569        // The path should start with dir/<prefix> and end with our suffix
570        final String fileNameString = fileName.toString();
571        if (!fileNameString.startsWith(prefixPathStr)) {
572          return false;
573        }
574        if (walFileSuffix.isEmpty()) {
575          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
576          return org.apache.commons.lang3.StringUtils
577            .isNumeric(fileNameString.substring(prefixPathStr.length()));
578        } else if (!fileNameString.endsWith(walFileSuffix)) {
579          return false;
580        }
581        return true;
582      }
583    };
584
585    if (failIfWALExists) {
586      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
587      if (null != walFiles && 0 != walFiles.length) {
588        throw new IOException("Target WAL already exists within directory " + walDir);
589      }
590    }
591
592    // Register listeners. TODO: Should this exist anymore? We have CPs?
593    if (listeners != null) {
594      for (WALActionsListener i : listeners) {
595        registerWALActionsListener(i);
596      }
597    }
598    this.coprocessorHost = new WALCoprocessorHost(this, conf);
599
600    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
601    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
602    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
603    // the block size but experience from the field has it that this was not enough time for the
604    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
605    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
606    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
607    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
608    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
609    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
610    this.logrollsize = (long) (this.blocksize * multiplier);
611    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
612
613    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
614      + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
615      + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir
616      + ", maxLogs=" + this.maxLogs);
617    this.slowSyncNs =
618      TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));
619    this.rollOnSyncNs = TimeUnit.MILLISECONDS
620      .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
621    this.slowSyncRollThreshold =
622      conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
623    this.slowSyncCheckInterval =
624      conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
625    this.walSyncTimeoutNs =
626      TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));
627    this.syncFutureCache = new SyncFutureCache(conf);
628    this.implClassName = getClass().getSimpleName();
629    this.walTooOldNs = TimeUnit.SECONDS
630      .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
631    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
632    archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
633    this.walShutdownTimeout =
634      conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
635
636    int preallocatedEventCount =
637      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
638    waitingConsumePayloads =
639      RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);
640    waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
641    waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
642
643    // inrease the ringbuffer sequence so our txid is start from 1
644    waitingConsumePayloads.publish(waitingConsumePayloads.next());
645    waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
646
647    batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
648  }
649
650  /**
651   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
652   */
653  @Override
654  public void init() throws IOException {
655    rollWriter();
656  }
657
658  @Override
659  public void registerWALActionsListener(WALActionsListener listener) {
660    this.listeners.add(listener);
661  }
662
663  @Override
664  public boolean unregisterWALActionsListener(WALActionsListener listener) {
665    return this.listeners.remove(listener);
666  }
667
668  @Override
669  public WALCoprocessorHost getCoprocessorHost() {
670    return coprocessorHost;
671  }
672
673  @Override
674  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
675    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
676  }
677
678  @Override
679  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
680    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
681  }
682
683  @Override
684  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
685    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
686  }
687
688  @Override
689  public void abortCacheFlush(byte[] encodedRegionName) {
690    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
691  }
692
693  @Override
694  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
695    // This method is used by tests and for figuring if we should flush or not because our
696    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
697    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
698    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
699    // currently flushing sequence ids, and if anything found there, it is returning these. This is
700    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
701    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
702    // id is old even though we are currently flushing. This may mean we do too much flushing.
703    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
704  }
705
706  @Override
707  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
708    return rollWriter(false);
709  }
710
711  @Override
712  public final void sync() throws IOException {
713    sync(useHsync);
714  }
715
716  @Override
717  public final void sync(long txid) throws IOException {
718    sync(txid, useHsync);
719  }
720
721  @Override
722  public final void sync(boolean forceSync) throws IOException {
723    TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
724  }
725
726  @Override
727  public final void sync(long txid, boolean forceSync) throws IOException {
728    TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
729  }
730
731  @RestrictedApi(explanation = "Should only be called in tests", link = "",
732      allowedOnPath = ".*/src/test/.*")
733  public SequenceIdAccounting getSequenceIdAccounting() {
734    return sequenceIdAccounting;
735  }
736
737  /**
738   * This is a convenience method that computes a new filename with a given file-number.
739   * @param filenum to use
740   */
741  protected Path computeFilename(final long filenum) {
742    if (filenum < 0) {
743      throw new RuntimeException("WAL file number can't be < 0");
744    }
745    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
746    return new Path(walDir, child);
747  }
748
749  /**
750   * This is a convenience method that computes a new filename with a given using the current WAL
751   * file-number
752   */
753  public Path getCurrentFileName() {
754    return computeFilename(this.filenum.get());
755  }
756
757  /**
758   * retrieve the next path to use for writing. Increments the internal filenum.
759   */
760  private Path getNewPath() throws IOException {
761    this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));
762    Path newPath = getCurrentFileName();
763    return newPath;
764  }
765
766  public Path getOldPath() {
767    long currentFilenum = this.filenum.get();
768    Path oldPath = null;
769    if (currentFilenum > 0) {
770      // ComputeFilename will take care of meta wal filename
771      oldPath = computeFilename(currentFilenum);
772    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
773    return oldPath;
774  }
775
776  /**
777   * Tell listeners about pre log roll.
778   */
779  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
780    throws IOException {
781    coprocessorHost.preWALRoll(oldPath, newPath);
782
783    if (!this.listeners.isEmpty()) {
784      for (WALActionsListener i : this.listeners) {
785        i.preLogRoll(oldPath, newPath);
786      }
787    }
788  }
789
790  /**
791   * Tell listeners about post log roll.
792   */
793  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
794    throws IOException {
795    if (!this.listeners.isEmpty()) {
796      for (WALActionsListener i : this.listeners) {
797        i.postLogRoll(oldPath, newPath);
798      }
799    }
800
801    coprocessorHost.postWALRoll(oldPath, newPath);
802  }
803
804  // public only until class moves to o.a.h.h.wal
805  /** Returns the number of rolled log files */
806  public int getNumRolledLogFiles() {
807    return walFile2Props.size();
808  }
809
810  // public only until class moves to o.a.h.h.wal
811  /** Returns the number of log files in use */
812  public int getNumLogFiles() {
813    // +1 for current use log
814    return getNumRolledLogFiles() + 1;
815  }
816
817  /**
818   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the
819   * first (oldest) WAL, and return those regions which should be flushed so that it can be
820   * let-go/'archived'.
821   * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file
822   */
823  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
824    Map<byte[], List<byte[]>> regions = null;
825    int logCount = getNumRolledLogFiles();
826    if (logCount > this.maxLogs && logCount > 0) {
827      Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
828      regions =
829        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
830    }
831    if (regions != null) {
832      List<String> listForPrint = new ArrayList<>();
833      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
834        StringBuilder families = new StringBuilder();
835        for (int i = 0; i < r.getValue().size(); i++) {
836          if (i > 0) {
837            families.append(",");
838          }
839          families.append(Bytes.toString(r.getValue().get(i)));
840        }
841        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
842      }
843      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs
844        + "; forcing (partial) flush of " + regions.size() + " region(s): "
845        + StringUtils.join(",", listForPrint));
846    }
847    return regions;
848  }
849
850  /**
851   * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
852   */
853  private void markClosedAndClean(Path path) {
854    WALProps props = walFile2Props.get(path);
855    // typically this should not be null, but if there is no big issue if it is already null, so
856    // let's make the code more robust
857    if (props != null) {
858      props.closed = true;
859      cleanOldLogs();
860    }
861  }
862
863  /**
864   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
865   * <p/>
866   * Use synchronized because we may call this method in different threads, normally when replacing
867   * writer, and since now close writer may be asynchronous, we will also call this method in the
868   * closeExecutor, right after we actually close a WAL writer.
869   */
870  private synchronized void cleanOldLogs() {
871    List<Pair<Path, Long>> logsToArchive = null;
872    long now = System.nanoTime();
873    boolean mayLogTooOld = nextLogTooOldNs <= now;
874    ArrayList<byte[]> regionsBlockingWal = null;
875    // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids
876    // are older than what is currently in memory, the WAL can be GC'd.
877    for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
878      if (!e.getValue().closed) {
879        LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
880        continue;
881      }
882      Path log = e.getKey();
883      ArrayList<byte[]> regionsBlockingThisWal = null;
884      long ageNs = now - e.getValue().rollTimeNs;
885      if (ageNs > walTooOldNs) {
886        if (mayLogTooOld && regionsBlockingWal == null) {
887          regionsBlockingWal = new ArrayList<>();
888        }
889        regionsBlockingThisWal = regionsBlockingWal;
890      }
891      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
892      if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {
893        if (logsToArchive == null) {
894          logsToArchive = new ArrayList<>();
895        }
896        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
897        if (LOG.isTraceEnabled()) {
898          LOG.trace("WAL file ready for archiving " + log);
899        }
900      } else if (regionsBlockingThisWal != null) {
901        StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")
902          .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");
903        boolean isFirst = true;
904        for (byte[] region : regionsBlockingThisWal) {
905          if (!isFirst) {
906            sb.append("; ");
907          }
908          isFirst = false;
909          sb.append(Bytes.toString(region));
910        }
911        LOG.warn(sb.toString());
912        nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;
913        regionsBlockingThisWal.clear();
914      }
915    }
916
917    if (logsToArchive != null) {
918      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
919      // make it async
920      for (Pair<Path, Long> log : localLogsToArchive) {
921        logArchiveExecutor.execute(() -> {
922          archive(log);
923        });
924        this.walFile2Props.remove(log.getFirst());
925      }
926    }
927  }
928
929  protected void archive(final Pair<Path, Long> log) {
930    totalLogSize.addAndGet(-log.getSecond());
931    int retry = 1;
932    while (true) {
933      try {
934        archiveLogFile(log.getFirst());
935        // successful
936        break;
937      } catch (Throwable e) {
938        if (retry > archiveRetries) {
939          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
940          if (this.abortable != null) {
941            this.abortable.abort("Failed log archiving", e);
942            break;
943          }
944        } else {
945          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);
946        }
947        retry++;
948      }
949    }
950  }
951
952  /*
953   * only public so WALSplitter can use.
954   * @return archived location of a WAL file with the given path p
955   */
956  public static Path getWALArchivePath(Path archiveDir, Path p) {
957    return new Path(archiveDir, p.getName());
958  }
959
960  protected void archiveLogFile(final Path p) throws IOException {
961    Path newPath = getWALArchivePath(this.walArchiveDir, p);
962    // Tell our listeners that a log is going to be archived.
963    if (!this.listeners.isEmpty()) {
964      for (WALActionsListener i : this.listeners) {
965        i.preLogArchive(p, newPath);
966      }
967    }
968    LOG.info("Archiving " + p + " to " + newPath);
969    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
970      throw new IOException("Unable to rename " + p + " to " + newPath);
971    }
972    // Tell our listeners that a log has been archived.
973    if (!this.listeners.isEmpty()) {
974      for (WALActionsListener i : this.listeners) {
975        i.postLogArchive(p, newPath);
976      }
977    }
978  }
979
980  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
981    int oldNumEntries = this.numEntries.getAndSet(0);
982    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
983    if (oldPath != null) {
984      this.walFile2Props.put(oldPath,
985        new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
986      this.totalLogSize.addAndGet(oldFileLen);
987      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
988        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
989        newPathString);
990    } else {
991      LOG.info("New WAL {}", newPathString);
992    }
993  }
994
995  private Span createSpan(String name) {
996    return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);
997  }
998
999  /**
1000   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
1001   * <p/>
1002   * <ul>
1003   * <li>In the case of creating a new WAL, oldPath will be null.</li>
1004   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
1005   * </li>
1006   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
1007   * null.</li>
1008   * </ul>
1009   * @param oldPath    may be null
1010   * @param newPath    may be null
1011   * @param nextWriter may be null
1012   * @return the passed in <code>newPath</code>
1013   * @throws IOException if there is a problem flushing or closing the underlying FS
1014   */
1015  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
1016    return TraceUtil.trace(() -> {
1017      doReplaceWriter(oldPath, newPath, nextWriter);
1018      return newPath;
1019    }, () -> createSpan("WAL.replaceWriter"));
1020  }
1021
1022  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
1023    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
1024    try {
1025      if (syncFuture != null) {
1026        if (closed) {
1027          throw new IOException("WAL has been closed");
1028        } else {
1029          syncFuture.get(walSyncTimeoutNs);
1030        }
1031      }
1032    } catch (TimeoutIOException tioe) {
1033      throw new WALSyncTimeoutIOException(tioe);
1034    } catch (InterruptedException ie) {
1035      LOG.warn("Interrupted", ie);
1036      throw convertInterruptedExceptionToIOException(ie);
1037    } catch (ExecutionException e) {
1038      throw ensureIOException(e.getCause());
1039    }
1040  }
1041
1042  private static IOException ensureIOException(final Throwable t) {
1043    return (t instanceof IOException) ? (IOException) t : new IOException(t);
1044  }
1045
1046  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1047    Thread.currentThread().interrupt();
1048    IOException ioe = new InterruptedIOException();
1049    ioe.initCause(ie);
1050    return ioe;
1051  }
1052
1053  private W createCombinedWriter(W localWriter, Path localPath)
1054    throws IOException, CommonFSUtils.StreamLacksCapabilityException {
1055    // retry forever if we can not create the remote writer to prevent aborting the RS due to log
1056    // rolling error, unless the skipRemoteWal is set to true.
1057    // TODO: since for now we only have one thread doing log rolling, this may block the rolling for
1058    // other wals
1059    Path remoteWAL = new Path(remoteWALDir, localPath.getName());
1060    for (int retry = 0;; retry++) {
1061      if (skipRemoteWAL) {
1062        return localWriter;
1063      }
1064      W remoteWriter;
1065      try {
1066        remoteWriter = createWriterInstance(remoteFs, remoteWAL);
1067      } catch (IOException e) {
1068        LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
1069        try {
1070          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
1071        } catch (InterruptedException ie) {
1072          // restore the interrupt state
1073          Thread.currentThread().interrupt();
1074          // must close local writer here otherwise no one will close it for us
1075          Closeables.close(localWriter, true);
1076          throw (IOException) new InterruptedIOException().initCause(ie);
1077        }
1078        continue;
1079      }
1080      return createCombinedWriter(localWriter, remoteWriter);
1081    }
1082  }
1083
1084  private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
1085    rollWriterLock.lock();
1086    try {
1087      if (this.closed) {
1088        throw new WALClosedException("WAL has been closed");
1089      }
1090      // Return if nothing to flush.
1091      if (!force && this.writer != null && this.numEntries.get() <= 0) {
1092        return null;
1093      }
1094      Map<byte[], List<byte[]>> regionsToFlush = null;
1095      try {
1096        Path oldPath = getOldPath();
1097        Path newPath = getNewPath();
1098        // Any exception from here on is catastrophic, non-recoverable, so we currently abort.
1099        W nextWriter = this.createWriterInstance(fs, newPath);
1100        if (remoteFs != null) {
1101          // create a remote wal if necessary
1102          nextWriter = createCombinedWriter(nextWriter, newPath);
1103        }
1104        tellListenersAboutPreLogRoll(oldPath, newPath);
1105        // NewPath could be equal to oldPath if replaceWriter fails.
1106        newPath = replaceWriter(oldPath, newPath, nextWriter);
1107        tellListenersAboutPostLogRoll(oldPath, newPath);
1108        if (LOG.isDebugEnabled()) {
1109          LOG.debug("Create new " + implClassName + " writer with pipeline: "
1110            + FanOutOneBlockAsyncDFSOutputHelper
1111              .getDataNodeInfo(Arrays.stream(getPipeline()).collect(Collectors.toList())));
1112        }
1113        // We got a new writer, so reset the slow sync count
1114        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
1115        slowSyncCount.set(0);
1116        // Can we delete any of the old log files?
1117        if (getNumRolledLogFiles() > 0) {
1118          cleanOldLogs();
1119          regionsToFlush = findRegionsToForceFlush();
1120        }
1121      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
1122        // If the underlying FileSystem can't do what we ask, treat as IO failure, so
1123        // we'll abort.
1124        throw new IOException(
1125          "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
1126          exception);
1127      }
1128      return regionsToFlush;
1129    } finally {
1130      rollWriterLock.unlock();
1131    }
1132  }
1133
1134  @Override
1135  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
1136    return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
1137  }
1138
1139  // public only until class moves to o.a.h.h.wal
1140  /** Returns the size of log files in use */
1141  public long getLogFileSize() {
1142    return this.totalLogSize.get();
1143  }
1144
1145  // public only until class moves to o.a.h.h.wal
1146  public void requestLogRoll() {
1147    requestLogRoll(ERROR);
1148  }
1149
1150  /**
1151   * Get the backing files associated with this WAL.
1152   * @return may be null if there are no files.
1153   */
1154  FileStatus[] getFiles() throws IOException {
1155    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
1156  }
1157
1158  @Override
1159  public void shutdown() throws IOException {
1160    if (!shutdown.compareAndSet(false, true)) {
1161      return;
1162    }
1163    closed = true;
1164    // Tell our listeners that the log is closing
1165    if (!this.listeners.isEmpty()) {
1166      for (WALActionsListener i : this.listeners) {
1167        i.logCloseRequested();
1168      }
1169    }
1170
1171    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
1172      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
1173
1174    Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
1175      @Override
1176      public Void call() throws Exception {
1177        if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
1178          try {
1179            doShutdown();
1180            if (syncFutureCache != null) {
1181              syncFutureCache.clear();
1182            }
1183          } finally {
1184            rollWriterLock.unlock();
1185          }
1186        } else {
1187          throw new IOException("Waiting for rollWriterLock timeout");
1188        }
1189        return null;
1190      }
1191    });
1192    shutdownExecutor.shutdown();
1193
1194    try {
1195      future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
1196    } catch (InterruptedException e) {
1197      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1198    } catch (TimeoutException e) {
1199      throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1200        + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1201        + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1202        + "\"", e);
1203    } catch (ExecutionException e) {
1204      if (e.getCause() instanceof IOException) {
1205        throw (IOException) e.getCause();
1206      } else {
1207        throw new IOException(e.getCause());
1208      }
1209    } finally {
1210      // in shutdown, we may call cleanOldLogs so shutdown this executor in the end.
1211      // In sync replication implementation, we may shut down a WAL without shutting down the whole
1212      // region server, if we shut down this executor earlier we may get reject execution exception
1213      // and abort the region server
1214      logArchiveExecutor.shutdown();
1215    }
1216    // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
1217    // have some pending archiving tasks not finished yet, and in close we may archive all the
1218    // remaining WAL files, there could be race if we do not wait for the background archive task
1219    // finish
1220    try {
1221      if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
1222        throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1223          + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1224          + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1225          + "\"");
1226      }
1227    } catch (InterruptedException e) {
1228      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1229    }
1230  }
1231
1232  @Override
1233  public void close() throws IOException {
1234    shutdown();
1235    final FileStatus[] files = getFiles();
1236    if (null != files && 0 != files.length) {
1237      for (FileStatus file : files) {
1238        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
1239        // Tell our listeners that a log is going to be archived.
1240        if (!this.listeners.isEmpty()) {
1241          for (WALActionsListener i : this.listeners) {
1242            i.preLogArchive(file.getPath(), p);
1243          }
1244        }
1245
1246        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1247          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1248        }
1249        // Tell our listeners that a log was archived.
1250        if (!this.listeners.isEmpty()) {
1251          for (WALActionsListener i : this.listeners) {
1252            i.postLogArchive(file.getPath(), p);
1253          }
1254        }
1255      }
1256      LOG.debug(
1257        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
1258    }
1259    LOG.info("Closed WAL: " + toString());
1260  }
1261
1262  /** Returns number of WALs currently in the process of closing. */
1263  public int getInflightWALCloseCount() {
1264    return inflightWALClosures.size();
1265  }
1266
1267  /**
1268   * updates the sequence number of a specific store. depending on the flag: replaces current seq
1269   * number if the given seq id is bigger, or even if it is lower than existing one
1270   */
1271  @Override
1272  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
1273    boolean onlyIfGreater) {
1274    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
1275  }
1276
1277  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
1278    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
1279  }
1280
1281  protected boolean isLogRollRequested() {
1282    return rollRequested.get();
1283  }
1284
1285  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
1286    // If we have already requested a roll, don't do it again
1287    // And only set rollRequested to true when there is a registered listener
1288    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
1289      for (WALActionsListener i : this.listeners) {
1290        i.logRollRequested(reason);
1291      }
1292    }
1293  }
1294
1295  long getUnflushedEntriesCount() {
1296    long highestSynced = this.highestSyncedTxid.get();
1297    long highestUnsynced = this.highestUnsyncedTxid;
1298    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
1299  }
1300
1301  boolean isUnflushedEntries() {
1302    return getUnflushedEntriesCount() > 0;
1303  }
1304
1305  /**
1306   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
1307   */
1308  protected void atHeadOfRingBufferEventHandlerAppend() {
1309    // Noop
1310  }
1311
1312  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
1313    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1314    atHeadOfRingBufferEventHandlerAppend();
1315    long start = EnvironmentEdgeManager.currentTime();
1316    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1317    long regionSequenceId = entry.getKey().getSequenceId();
1318
1319    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1320    // region sequence id only, a region edit/sequence id that is not associated with an actual
1321    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1322    if (entry.getEdit().isEmpty()) {
1323      return false;
1324    }
1325
1326    // Coprocessor hook.
1327    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1328    if (!listeners.isEmpty()) {
1329      for (WALActionsListener i : listeners) {
1330        i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1331      }
1332    }
1333    doAppend(writer, entry);
1334    assert highestUnsyncedTxid < entry.getTxid();
1335    highestUnsyncedTxid = entry.getTxid();
1336    if (entry.isCloseRegion()) {
1337      // let's clean all the records of this region
1338      sequenceIdAccounting.onRegionClose(encodedRegionName);
1339    } else {
1340      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1341        entry.isInMemStore());
1342    }
1343    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1344    // Update metrics.
1345    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1346    numEntries.incrementAndGet();
1347    return true;
1348  }
1349
1350  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1351    long len = 0;
1352    if (!listeners.isEmpty()) {
1353      for (Cell cell : e.getEdit().getCells()) {
1354        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1355      }
1356      for (WALActionsListener listener : listeners) {
1357        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1358      }
1359    }
1360    return len;
1361  }
1362
1363  protected final void postSync(long timeInNanos, int handlerSyncs) {
1364    if (timeInNanos > this.slowSyncNs) {
1365      String msg = new StringBuilder().append("Slow sync cost: ")
1366        .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")
1367        .append(Arrays.toString(getPipeline())).toString();
1368      LOG.info(msg);
1369      if (timeInNanos > this.rollOnSyncNs) {
1370        // A single sync took too long.
1371        // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1372        // effects. Here we have a single data point that indicates we should take immediate
1373        // action, so do so.
1374        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="
1375          + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="
1376          + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "
1377          + Arrays.toString(getPipeline()));
1378        requestLogRoll(SLOW_SYNC);
1379      }
1380      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1381    }
1382    if (!listeners.isEmpty()) {
1383      for (WALActionsListener listener : listeners) {
1384        listener.postSync(timeInNanos, handlerSyncs);
1385      }
1386    }
1387  }
1388
1389  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1390    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
1391    if (this.closed) {
1392      throw new IOException(
1393        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1394    }
1395    MutableLong txidHolder = new MutableLong();
1396    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1397      txidHolder.setValue(ringBuffer.next());
1398    });
1399    long txid = txidHolder.longValue();
1400    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
1401    try {
1402      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1403      entry.stampRegionSequenceId(we);
1404      ringBuffer.get(txid).load(entry);
1405    } finally {
1406      ringBuffer.publish(txid);
1407    }
1408    return txid;
1409  }
1410
1411  @Override
1412  public String toString() {
1413    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1414  }
1415
1416  /**
1417   * if the given {@code path} is being written currently, then return its length.
1418   * <p>
1419   * This is used by replication to prevent replicating unacked log entries. See
1420   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1421   */
1422  @Override
1423  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1424    rollWriterLock.lock();
1425    try {
1426      Path currentPath = getOldPath();
1427      if (path.equals(currentPath)) {
1428        // Currently active path.
1429        W writer = this.writer;
1430        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1431      } else {
1432        W temp = inflightWALClosures.get(path.getName());
1433        if (temp != null) {
1434          // In the process of being closed, trailer bytes may or may not be flushed.
1435          // Ensuring that we read all the bytes in a file is critical for correctness of tailing
1436          // use cases like replication, see HBASE-25924/HBASE-25932.
1437          return OptionalLong.of(temp.getSyncedLength());
1438        }
1439        // Log rolled successfully.
1440        return OptionalLong.empty();
1441      }
1442    } finally {
1443      rollWriterLock.unlock();
1444    }
1445  }
1446
1447  @Override
1448  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1449    return TraceUtil.trace(() -> append(info, key, edits, true),
1450      () -> createSpan("WAL.appendData"));
1451  }
1452
1453  @Override
1454  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1455    return TraceUtil.trace(() -> append(info, key, edits, false),
1456      () -> createSpan("WAL.appendMarker"));
1457  }
1458
1459  /**
1460   * Helper that marks the future as DONE and offers it back to the cache.
1461   */
1462  protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
1463    future.done(txid, t);
1464    syncFutureCache.offer(future);
1465  }
1466
1467  private static boolean waitingRoll(int epochAndState) {
1468    return (epochAndState & 1) != 0;
1469  }
1470
1471  private static boolean writerBroken(int epochAndState) {
1472    return ((epochAndState >>> 1) & 1) != 0;
1473  }
1474
1475  private static int epoch(int epochAndState) {
1476    return epochAndState >>> 2;
1477  }
1478
1479  // return whether we have successfully set readyForRolling to true.
1480  private boolean trySetReadyForRolling() {
1481    // Check without holding lock first. Usually we will just return here.
1482    // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe
1483    // to check them outside the consumeLock.
1484    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
1485      return false;
1486    }
1487    consumeLock.lock();
1488    try {
1489      // 1. a roll is requested
1490      // 2. all out-going entries have been acked(we have confirmed above).
1491      if (waitingRoll(epochAndState)) {
1492        readyForRolling = true;
1493        readyForRollingCond.signalAll();
1494        return true;
1495      } else {
1496        return false;
1497      }
1498    } finally {
1499      consumeLock.unlock();
1500    }
1501  }
1502
1503  private void syncFailed(long epochWhenSync, Throwable error) {
1504    LOG.warn("sync failed", error);
1505    this.onException(epochWhenSync, error);
1506  }
1507
1508  private void onException(long epochWhenSync, Throwable error) {
1509    boolean shouldRequestLogRoll = true;
1510    consumeLock.lock();
1511    try {
1512      int currentEpochAndState = epochAndState;
1513      if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {
1514        // this is not the previous writer which means we have already rolled the writer.
1515        // or this is still the current writer, but we have already marked it as broken and request
1516        // a roll.
1517        return;
1518      }
1519      this.epochAndState = currentEpochAndState | 0b10;
1520      if (waitingRoll(currentEpochAndState)) {
1521        readyForRolling = true;
1522        readyForRollingCond.signalAll();
1523        // this means we have already in the middle of a rollWriter so just tell the roller thread
1524        // that you can continue without requesting an extra log roll.
1525        shouldRequestLogRoll = false;
1526      }
1527    } finally {
1528      consumeLock.unlock();
1529    }
1530    for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {
1531      toWriteAppends.addFirst(iter.next());
1532    }
1533    highestUnsyncedTxid = highestSyncedTxid.get();
1534    if (shouldRequestLogRoll) {
1535      // request a roll.
1536      requestLogRoll(ERROR);
1537    }
1538  }
1539
1540  private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) {
1541    // Please see the last several comments on HBASE-22761, it is possible that we get a
1542    // syncCompleted which acks a previous sync request after we received a syncFailed on the same
1543    // writer. So here we will also check on the epoch and state, if the epoch has already been
1544    // changed, i.e, we have already rolled the writer, or the writer is already broken, we should
1545    // just skip here, to avoid mess up the state or accidentally release some WAL entries and
1546    // cause data corruption.
1547    // The syncCompleted call is on the critical write path, so we should try our best to make it
1548    // fast. So here we do not hold consumeLock, for increasing performance. It is safe because
1549    // there are only 3 possible situations:
1550    // 1. For normal case, the only place where we change epochAndState is when rolling the writer.
1551    // Before rolling actually happen, we will only change the state to waitingRoll which is another
1552    // bit than writerBroken, and when we actually change the epoch, we can make sure that there is
1553    // no outgoing sync request. So we will always pass the check here and there is no problem.
1554    // 2. The writer is broken, but we have not called syncFailed yet. In this case, since
1555    // syncFailed and syncCompleted are executed in the same thread, we will just face the same
1556    // situation with #1.
1557    // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are
1558    // only 2 possible situations:
1559    // a. we arrive before we actually roll the writer, then we will find out the writer is broken
1560    // and give up.
1561    // b. we arrive after we actually roll the writer, then we will find out the epoch is changed
1562    // and give up.
1563    // For both #a and #b, we do not need to hold the consumeLock as we will always update the
1564    // epochAndState as a whole.
1565    // So in general, for all the cases above, we do not need to hold the consumeLock.
1566    int epochAndState = this.epochAndState;
1567    if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {
1568      LOG.warn("Got a sync complete call after the writer is broken, skip");
1569      return;
1570    }
1571
1572    if (processedTxid < highestSyncedTxid.get()) {
1573      return;
1574    }
1575    highestSyncedTxid.set(processedTxid);
1576    for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
1577      FSWALEntry entry = iter.next();
1578      if (entry.getTxid() <= processedTxid) {
1579        entry.release();
1580        iter.remove();
1581      } else {
1582        break;
1583      }
1584    }
1585    postSync(System.nanoTime() - startTimeNs, finishSync());
1586    /**
1587     * This method is used to be compatible with the original logic of {@link FSHLog}.
1588     */
1589    checkSlowSyncCount();
1590    if (trySetReadyForRolling()) {
1591      // we have just finished a roll, then do not need to check for log rolling, the writer will be
1592      // closed soon.
1593      return;
1594    }
1595    // If we haven't already requested a roll, check if we have exceeded logrollsize
1596    if (!isLogRollRequested() && writer.getLength() > logrollsize) {
1597      if (LOG.isDebugEnabled()) {
1598        LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()
1599          + ", logrollsize=" + logrollsize);
1600      }
1601      requestLogRoll(SIZE);
1602    }
1603  }
1604
1605  // find all the sync futures between these two txids to see if we need to issue a hsync, if no
1606  // sync futures then just use the default one.
1607  private boolean isHsync(long beginTxid, long endTxid) {
1608    SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),
1609      new SyncFuture().reset(endTxid + 1, false));
1610    if (futures.isEmpty()) {
1611      return useHsync;
1612    }
1613    for (SyncFuture future : futures) {
1614      if (future.isForceSync()) {
1615        return true;
1616      }
1617    }
1618    return false;
1619  }
1620
1621  private void sync(W writer) {
1622    fileLengthAtLastSync = writer.getLength();
1623    long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
1624    boolean shouldUseHsync =
1625      isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);
1626    highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
1627    final long startTimeNs = System.nanoTime();
1628    final long epoch = (long) epochAndState >>> 2L;
1629    addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid),
1630      (result, error) -> {
1631        if (error != null) {
1632          syncFailed(epoch, error);
1633        } else {
1634          long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result);
1635          syncCompleted(epoch, writer, syncedTxid, startTimeNs);
1636        }
1637      }, consumeExecutor);
1638  }
1639
1640  /**
1641   * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use
1642   * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling
1643   * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we
1644   * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as
1645   * successful syncedTxid.
1646   */
1647  protected long getSyncedTxid(long processedTxid, long completableFutureResult) {
1648    return processedTxid;
1649  }
1650
1651  protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync,
1652    long txidWhenSyn);
1653
1654  private int finishSyncLowerThanTxid(long txid) {
1655    int finished = 0;
1656    for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
1657      SyncFuture sync = iter.next();
1658      if (sync.getTxid() <= txid) {
1659        markFutureDoneAndOffer(sync, txid, null);
1660        iter.remove();
1661        finished++;
1662      } else {
1663        break;
1664      }
1665    }
1666    return finished;
1667  }
1668
1669  // try advancing the highestSyncedTxid as much as possible
1670  private int finishSync() {
1671    if (unackedAppends.isEmpty()) {
1672      // All outstanding appends have been acked.
1673      if (toWriteAppends.isEmpty()) {
1674        // Also no appends that wait to be written out, then just finished all pending syncs.
1675        long maxSyncTxid = highestSyncedTxid.get();
1676        for (SyncFuture sync : syncFutures) {
1677          maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
1678          markFutureDoneAndOffer(sync, maxSyncTxid, null);
1679        }
1680        highestSyncedTxid.set(maxSyncTxid);
1681        int finished = syncFutures.size();
1682        syncFutures.clear();
1683        return finished;
1684      } else {
1685        // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so
1686        // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between
1687        // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.
1688        long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();
1689        assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;
1690        long doneTxid = lowestUnprocessedAppendTxid - 1;
1691        highestSyncedTxid.set(doneTxid);
1692        return finishSyncLowerThanTxid(doneTxid);
1693      }
1694    } else {
1695      // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the
1696      // first unacked append minus 1.
1697      long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();
1698      long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());
1699      highestSyncedTxid.set(doneTxid);
1700      return finishSyncLowerThanTxid(doneTxid);
1701    }
1702  }
1703
1704  // confirm non-empty before calling
1705  private static long getLastTxid(Deque<FSWALEntry> queue) {
1706    return queue.peekLast().getTxid();
1707  }
1708
1709  private void appendAndSync() throws IOException {
1710    final W writer = this.writer;
1711    // maybe a sync request is not queued when we issue a sync, so check here to see if we could
1712    // finish some.
1713    finishSync();
1714    long newHighestProcessedAppendTxid = -1L;
1715    // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single
1716    // threaded, this could save us some cycles
1717    boolean addedToUnackedAppends = false;
1718    for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {
1719      FSWALEntry entry = iter.next();
1720      /**
1721       * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not
1722       * throw any IOException.
1723       */
1724      boolean appended = appendEntry(writer, entry);
1725      newHighestProcessedAppendTxid = entry.getTxid();
1726      iter.remove();
1727      if (appended) {
1728        // This is possible, when we fail to sync, we will add the unackedAppends back to
1729        // toWriteAppends, so here we may get an entry which is already in the unackedAppends.
1730        if (
1731          addedToUnackedAppends || unackedAppends.isEmpty()
1732            || getLastTxid(unackedAppends) < entry.getTxid()
1733        ) {
1734          unackedAppends.addLast(entry);
1735          addedToUnackedAppends = true;
1736        }
1737        // See HBASE-25905, here we need to make sure that, we will always write all the entries in
1738        // unackedAppends out. As the code in the consume method will assume that, the entries in
1739        // unackedAppends have all been sent out so if there is roll request and unackedAppends is
1740        // not empty, we could just return as later there will be a syncCompleted call to clear the
1741        // unackedAppends, or a syncFailed to lead us to another state.
1742        // There could be other ways to fix, such as changing the logic in the consume method, but
1743        // it will break the assumption and then (may) lead to a big refactoring. So here let's use
1744        // this way to fix first, can optimize later.
1745        if (
1746          writer.getLength() - fileLengthAtLastSync >= batchSize
1747            && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))
1748        ) {
1749          break;
1750        }
1751      }
1752    }
1753    // if we have a newer transaction id, update it.
1754    // otherwise, use the previous transaction id.
1755    if (newHighestProcessedAppendTxid > 0) {
1756      highestProcessedAppendTxid = newHighestProcessedAppendTxid;
1757    } else {
1758      newHighestProcessedAppendTxid = highestProcessedAppendTxid;
1759    }
1760
1761    if (writer.getLength() - fileLengthAtLastSync >= batchSize) {
1762      // sync because buffer size limit.
1763      sync(writer);
1764      return;
1765    }
1766    if (writer.getLength() == fileLengthAtLastSync) {
1767      // we haven't written anything out, just advance the highestSyncedSequence since we may only
1768      // stamp some region sequence id.
1769      if (unackedAppends.isEmpty()) {
1770        highestSyncedTxid.set(highestProcessedAppendTxid);
1771        finishSync();
1772        trySetReadyForRolling();
1773      }
1774      return;
1775    }
1776    // reach here means that we have some unsynced data but haven't reached the batch size yet,
1777    // but we will not issue a sync directly here even if there are sync requests because we may
1778    // have some new data in the ringbuffer, so let's just return here and delay the decision of
1779    // whether to issue a sync in the caller method.
1780  }
1781
1782  private void consume() {
1783    consumeLock.lock();
1784    try {
1785      int currentEpochAndState = epochAndState;
1786      if (writerBroken(currentEpochAndState)) {
1787        return;
1788      }
1789      if (waitingRoll(currentEpochAndState)) {
1790        if (writer.getLength() > fileLengthAtLastSync) {
1791          // issue a sync
1792          sync(writer);
1793        } else {
1794          if (unackedAppends.isEmpty()) {
1795            readyForRolling = true;
1796            readyForRollingCond.signalAll();
1797          }
1798        }
1799        return;
1800      }
1801    } finally {
1802      consumeLock.unlock();
1803    }
1804    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
1805    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
1806        <= cursorBound; nextCursor++) {
1807      if (!waitingConsumePayloads.isPublished(nextCursor)) {
1808        break;
1809      }
1810      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
1811      switch (truck.type()) {
1812        case APPEND:
1813          toWriteAppends.addLast(truck.unloadAppend());
1814          break;
1815        case SYNC:
1816          syncFutures.add(truck.unloadSync());
1817          break;
1818        default:
1819          LOG.warn("RingBufferTruck with unexpected type: " + truck.type());
1820          break;
1821      }
1822      waitingConsumePayloadsGatingSequence.set(nextCursor);
1823    }
1824
1825    /**
1826     * This method is used to be compatible with the original logic of {@link AsyncFSWAL}.
1827     */
1828    if (markerEditOnly) {
1829      drainNonMarkerEditsAndFailSyncs();
1830    }
1831    try {
1832      appendAndSync();
1833    } catch (IOException exception) {
1834      /**
1835       * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't
1836       * go in here.
1837       */
1838      LOG.error("appendAndSync throws IOException.", exception);
1839      onAppendEntryFailed(exception);
1840      return;
1841    }
1842    if (hasConsumerTask.get()) {
1843      return;
1844    }
1845    if (toWriteAppends.isEmpty()) {
1846      if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
1847        consumerScheduled.set(false);
1848        // recheck here since in append and sync we do not hold the consumeLock. Thing may
1849        // happen like
1850        // 1. we check cursor, no new entry
1851        // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and
1852        // give up scheduling the consumer task.
1853        // 3. we set consumerScheduled to false and also give up scheduling consumer task.
1854        if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {
1855          // we will give up consuming so if there are some unsynced data we need to issue a sync.
1856          if (
1857            writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()
1858              && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync
1859          ) {
1860            // no new data in the ringbuffer and we have at least one sync request
1861            sync(writer);
1862          }
1863          return;
1864        } else {
1865          // maybe someone has grabbed this before us
1866          if (!consumerScheduled.compareAndSet(false, true)) {
1867            return;
1868          }
1869        }
1870      }
1871    }
1872    // reschedule if we still have something to write.
1873    consumeExecutor.execute(consumer);
1874  }
1875
1876  private boolean shouldScheduleConsumer() {
1877    int currentEpochAndState = epochAndState;
1878    if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {
1879      return false;
1880    }
1881    return consumerScheduled.compareAndSet(false, true);
1882  }
1883
1884  /**
1885   * Append a set of edits to the WAL.
1886   * <p/>
1887   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1888   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1889   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1890   * <p/>
1891   * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc
1892   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1893   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1894   * 'complete' the transaction this mvcc transaction by calling
1895   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1896   * in the finally of a try/finally block within which this appends lives and any subsequent
1897   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1898   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1899   * immediately available on return from this method. It WILL be available subsequent to a sync of
1900   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1901   * @param hri        the regioninfo associated with append
1902   * @param key        Modified by this call; we add to it this edits region edit/sequence id.
1903   * @param edits      Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1904   *                   sequence id that is after all currently appended edits.
1905   * @param inMemstore Always true except for case where we are writing a region event meta marker
1906   *                   edit, for example, a compaction completion record into the WAL or noting a
1907   *                   Region Open event. In these cases the entry is just so we can finish an
1908   *                   unfinished compaction after a crash when the new Server reads the WAL on
1909   *                   recovery, etc. These transition event 'Markers' do not go via the memstore.
1910   *                   When memstore is false, we presume a Marker event edit.
1911   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1912   *         in it.
1913   */
1914  protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1915    throws IOException {
1916    if (markerEditOnly && !edits.isMetaEdit()) {
1917      throw new IOException("WAL is closing, only marker edit is allowed");
1918    }
1919    long txid =
1920      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
1921    if (shouldScheduleConsumer()) {
1922      consumeExecutor.execute(consumer);
1923    }
1924    return txid;
1925  }
1926
1927  protected void doSync(boolean forceSync) throws IOException {
1928    long txid = waitingConsumePayloads.next();
1929    SyncFuture future;
1930    try {
1931      future = getSyncFuture(txid, forceSync);
1932      RingBufferTruck truck = waitingConsumePayloads.get(txid);
1933      truck.load(future);
1934    } finally {
1935      waitingConsumePayloads.publish(txid);
1936    }
1937    if (shouldScheduleConsumer()) {
1938      consumeExecutor.execute(consumer);
1939    }
1940    blockOnSync(future);
1941  }
1942
1943  protected void doSync(long txid, boolean forceSync) throws IOException {
1944    if (highestSyncedTxid.get() >= txid) {
1945      return;
1946    }
1947    // here we do not use ring buffer sequence as txid
1948    long sequence = waitingConsumePayloads.next();
1949    SyncFuture future;
1950    try {
1951      future = getSyncFuture(txid, forceSync);
1952      RingBufferTruck truck = waitingConsumePayloads.get(sequence);
1953      truck.load(future);
1954    } finally {
1955      waitingConsumePayloads.publish(sequence);
1956    }
1957    if (shouldScheduleConsumer()) {
1958      consumeExecutor.execute(consumer);
1959    }
1960    blockOnSync(future);
1961  }
1962
1963  private void drainNonMarkerEditsAndFailSyncs() {
1964    if (toWriteAppends.isEmpty()) {
1965      return;
1966    }
1967    boolean hasNonMarkerEdits = false;
1968    Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
1969    while (iter.hasNext()) {
1970      FSWALEntry entry = iter.next();
1971      if (!entry.getEdit().isMetaEdit()) {
1972        entry.release();
1973        hasNonMarkerEdits = true;
1974        break;
1975      }
1976    }
1977    if (hasNonMarkerEdits) {
1978      for (;;) {
1979        iter.remove();
1980        if (!iter.hasNext()) {
1981          break;
1982        }
1983        iter.next().release();
1984      }
1985      for (FSWALEntry entry : unackedAppends) {
1986        entry.release();
1987      }
1988      unackedAppends.clear();
1989      // fail the sync futures which are under the txid of the first remaining edit, if none, fail
1990      // all the sync futures.
1991      long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
1992      IOException error = new IOException("WAL is closing, only marker edit is allowed");
1993      for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
1994        SyncFuture future = syncIter.next();
1995        if (future.getTxid() < txid) {
1996          markFutureDoneAndOffer(future, future.getTxid(), error);
1997          syncIter.remove();
1998        } else {
1999          break;
2000        }
2001      }
2002    }
2003  }
2004
2005  protected abstract W createWriterInstance(FileSystem fs, Path path)
2006    throws IOException, CommonFSUtils.StreamLacksCapabilityException;
2007
2008  protected abstract W createCombinedWriter(W localWriter, W remoteWriter);
2009
2010  protected final void waitForSafePoint() {
2011    consumeLock.lock();
2012    try {
2013      int currentEpochAndState = epochAndState;
2014      if (writerBroken(currentEpochAndState) || this.writer == null) {
2015        return;
2016      }
2017      consumerScheduled.set(true);
2018      epochAndState = currentEpochAndState | 1;
2019      readyForRolling = false;
2020      consumeExecutor.execute(consumer);
2021      while (!readyForRolling) {
2022        readyForRollingCond.awaitUninterruptibly();
2023      }
2024    } finally {
2025      consumeLock.unlock();
2026    }
2027  }
2028
2029  private void recoverLease(FileSystem fs, Path p, Configuration conf) {
2030    try {
2031      RecoverLeaseFSUtils.recoverFileLease(fs, p, conf, null);
2032    } catch (IOException ex) {
2033      LOG.error("Unable to recover lease after several attempts. Give up.", ex);
2034    }
2035  }
2036
2037  protected final void closeWriter(W writer, Path path) {
2038    inflightWALClosures.put(path.getName(), writer);
2039    closeExecutor.execute(() -> {
2040      try {
2041        writer.close();
2042      } catch (IOException e) {
2043        LOG.warn("close old writer failed.", e);
2044        recoverLease(this.fs, path, conf);
2045      } finally {
2046        // call this even if the above close fails, as there is no other chance we can set closed to
2047        // true, it will not cause big problems.
2048        markClosedAndClean(path);
2049        inflightWALClosures.remove(path.getName());
2050      }
2051    });
2052  }
2053
2054  /**
2055   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
2056   * will begin to work before returning from this method. If we clear the flag after returning from
2057   * this call, we may miss a roll request. The implementation class should choose a proper place to
2058   * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you
2059   * start writing to the new writer.
2060   */
2061  protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
2062    Preconditions.checkNotNull(nextWriter);
2063    waitForSafePoint();
2064    /**
2065     * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.
2066     */
2067    doCleanUpResources();
2068    // we will call rollWriter in init method, where we want to create the first writer and
2069    // obviously the previous writer is null, so here we need this null check. And why we must call
2070    // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
2071    // closing the writer asynchronously, we need to make sure the WALProps is put into
2072    // walFile2Props before we call markClosedAndClean
2073    if (writer != null) {
2074      long oldFileLen = writer.getLength();
2075      logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
2076      closeWriter(writer, oldPath);
2077    } else {
2078      logRollAndSetupWalProps(oldPath, newPath, 0);
2079    }
2080    this.writer = nextWriter;
2081    /**
2082     * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem
2083     * output after writer is replaced.
2084     */
2085    onWriterReplaced(nextWriter);
2086    this.fileLengthAtLastSync = nextWriter.getLength();
2087    this.highestProcessedAppendTxidAtLastSync = 0L;
2088    consumeLock.lock();
2089    try {
2090      consumerScheduled.set(true);
2091      int currentEpoch = epochAndState >>> 2;
2092      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
2093      // set a new epoch and also clear waitingRoll and writerBroken
2094      this.epochAndState = nextEpoch << 2;
2095      // Reset rollRequested status
2096      rollRequested.set(false);
2097      consumeExecutor.execute(consumer);
2098    } finally {
2099      consumeLock.unlock();
2100    }
2101  }
2102
2103  protected abstract void onWriterReplaced(W nextWriter);
2104
2105  protected void doShutdown() throws IOException {
2106    waitForSafePoint();
2107    /**
2108     * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.
2109     */
2110    doCleanUpResources();
2111    if (this.writer != null) {
2112      closeWriter(this.writer, getOldPath());
2113      this.writer = null;
2114    }
2115    closeExecutor.shutdown();
2116    try {
2117      if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
2118        LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"
2119          + " the close of async writer doesn't complete."
2120          + "Please check the status of underlying filesystem"
2121          + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey
2122          + "\"");
2123      }
2124    } catch (InterruptedException e) {
2125      LOG.error("The wait for close of async writer is interrupted");
2126      Thread.currentThread().interrupt();
2127    }
2128    IOException error = new IOException("WAL has been closed");
2129    long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
2130    // drain all the pending sync requests
2131    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor
2132        <= cursorBound; nextCursor++) {
2133      if (!waitingConsumePayloads.isPublished(nextCursor)) {
2134        break;
2135      }
2136      RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);
2137      switch (truck.type()) {
2138        case SYNC:
2139          syncFutures.add(truck.unloadSync());
2140          break;
2141        default:
2142          break;
2143      }
2144    }
2145    // and fail them
2146    syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
2147    if (this.shouldShutDownConsumeExecutorWhenClose) {
2148      consumeExecutor.shutdown();
2149    }
2150  }
2151
2152  protected void doCleanUpResources() {
2153  };
2154
2155  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
2156
2157  /**
2158   * This method gets the pipeline for the current WAL.
2159   */
2160  abstract DatanodeInfo[] getPipeline();
2161
2162  /**
2163   * This method gets the datanode replication count for the current WAL.
2164   */
2165  abstract int getLogReplication();
2166
2167  protected abstract boolean doCheckLogLowReplication();
2168
2169  protected boolean isWriterBroken() {
2170    return writerBroken(epochAndState);
2171  }
2172
2173  private void onAppendEntryFailed(IOException exception) {
2174    LOG.warn("append entry failed", exception);
2175    final long currentEpoch = (long) epochAndState >>> 2L;
2176    this.onException(currentEpoch, exception);
2177  }
2178
2179  protected void checkSlowSyncCount() {
2180  }
2181
2182  /** Returns true if we exceeded the slow sync roll threshold over the last check interval */
2183  protected boolean doCheckSlowSync() {
2184    boolean result = false;
2185    long now = EnvironmentEdgeManager.currentTime();
2186    long elapsedTime = now - lastTimeCheckSlowSync;
2187    if (elapsedTime >= slowSyncCheckInterval) {
2188      if (slowSyncCount.get() >= slowSyncRollThreshold) {
2189        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
2190          // If two or more slowSyncCheckInterval have elapsed this is a corner case
2191          // where a train of slow syncs almost triggered us but then there was a long
2192          // interval from then until the one more that pushed us over. If so, we
2193          // should do nothing and let the count reset.
2194          if (LOG.isDebugEnabled()) {
2195            LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="
2196              + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="
2197              + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");
2198          }
2199          // Fall through to count reset below
2200        } else {
2201          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="
2202            + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "
2203            + Arrays.toString(getPipeline()));
2204          result = true;
2205        }
2206      }
2207      lastTimeCheckSlowSync = now;
2208      slowSyncCount.set(0);
2209    }
2210    return result;
2211  }
2212
2213  public void checkLogLowReplication(long checkInterval) {
2214    long now = EnvironmentEdgeManager.currentTime();
2215    if (now - lastTimeCheckLowReplication < checkInterval) {
2216      return;
2217    }
2218    // Will return immediately if we are in the middle of a WAL log roll currently.
2219    if (!rollWriterLock.tryLock()) {
2220      return;
2221    }
2222    try {
2223      lastTimeCheckLowReplication = now;
2224      if (doCheckLogLowReplication()) {
2225        requestLogRoll(LOW_REPLICATION);
2226      }
2227    } finally {
2228      rollWriterLock.unlock();
2229    }
2230  }
2231
2232  // Allow temporarily skipping the creation of remote writer. When failing to write to the remote
2233  // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we
2234  // need to write a close marker when closing a region, and if it fails, the whole rs will abort.
2235  // So here we need to skip the creation of remote writer and make it possible to write the region
2236  // close marker.
2237  // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing
2238  // any pending wal entries as they will be discarded. The remote cluster will replicate the
2239  // correct data back later. We still need to allow writing marker edits such as close region event
2240  // to allow closing a region.
2241  @Override
2242  public void skipRemoteWAL(boolean markerEditOnly) {
2243    if (markerEditOnly) {
2244      this.markerEditOnly = true;
2245    }
2246    this.skipRemoteWAL = true;
2247  }
2248
2249  private static void split(final Configuration conf, final Path p) throws IOException {
2250    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
2251    if (!fs.exists(p)) {
2252      throw new FileNotFoundException(p.toString());
2253    }
2254    if (!fs.getFileStatus(p).isDirectory()) {
2255      throw new IOException(p + " is not a directory");
2256    }
2257
2258    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
2259    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
2260    if (
2261      conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
2262        AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)
2263    ) {
2264      archiveDir = new Path(archiveDir, p.getName());
2265    }
2266    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
2267  }
2268
2269  W getWriter() {
2270    return this.writer;
2271  }
2272
2273  private static void usage() {
2274    System.err.println("Usage: AbstractFSWAL <ARGS>");
2275    System.err.println("Arguments:");
2276    System.err.println(" --dump  Dump textual representation of passed one or more files");
2277    System.err.println("         For example: "
2278      + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
2279    System.err.println(" --split Split the passed directory of WAL logs");
2280    System.err.println(
2281      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
2282  }
2283
2284  /**
2285   * Pass one or more log file names, and it will either dump out a text version on
2286   * <code>stdout</code> or split the specified log files.
2287   */
2288  public static void main(String[] args) throws IOException {
2289    if (args.length < 2) {
2290      usage();
2291      System.exit(-1);
2292    }
2293    // either dump using the WALPrettyPrinter or split, depending on args
2294    if (args[0].compareTo("--dump") == 0) {
2295      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
2296    } else if (args[0].compareTo("--perf") == 0) {
2297      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
2298      LOG.error(HBaseMarkers.FATAL,
2299        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
2300      System.exit(-1);
2301    } else if (args[0].compareTo("--split") == 0) {
2302      Configuration conf = HBaseConfiguration.create();
2303      for (int i = 1; i < args.length; i++) {
2304        try {
2305          Path logPath = new Path(args[i]);
2306          CommonFSUtils.setFsDefault(conf, logPath);
2307          split(conf, logPath);
2308        } catch (IOException t) {
2309          t.printStackTrace(System.err);
2310          System.exit(-1);
2311        }
2312      }
2313    } else {
2314      usage();
2315      System.exit(-1);
2316    }
2317  }
2318}