001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;
021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;
022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;
023import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;
024import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
026import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
027
028import com.lmax.disruptor.RingBuffer;
029import io.opentelemetry.api.trace.Span;
030import java.io.FileNotFoundException;
031import java.io.IOException;
032import java.io.InterruptedIOException;
033import java.lang.management.MemoryType;
034import java.net.URLEncoder;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Comparator;
038import java.util.List;
039import java.util.Map;
040import java.util.OptionalLong;
041import java.util.Set;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentNavigableMap;
045import java.util.concurrent.ConcurrentSkipListMap;
046import java.util.concurrent.CopyOnWriteArrayList;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.ExecutorService;
049import java.util.concurrent.Executors;
050import java.util.concurrent.Future;
051import java.util.concurrent.LinkedBlockingQueue;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.concurrent.TimeoutException;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.concurrent.atomic.AtomicInteger;
057import java.util.concurrent.atomic.AtomicLong;
058import java.util.concurrent.locks.ReentrantLock;
059import org.apache.commons.lang3.mutable.MutableLong;
060import org.apache.hadoop.conf.Configuration;
061import org.apache.hadoop.fs.FileStatus;
062import org.apache.hadoop.fs.FileSystem;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.fs.PathFilter;
065import org.apache.hadoop.hbase.Abortable;
066import org.apache.hadoop.hbase.Cell;
067import org.apache.hadoop.hbase.HBaseConfiguration;
068import org.apache.hadoop.hbase.HConstants;
069import org.apache.hadoop.hbase.PrivateCellUtil;
070import org.apache.hadoop.hbase.client.RegionInfo;
071import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
072import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
073import org.apache.hadoop.hbase.ipc.RpcServer;
074import org.apache.hadoop.hbase.ipc.ServerCall;
075import org.apache.hadoop.hbase.log.HBaseMarkers;
076import org.apache.hadoop.hbase.regionserver.HRegion;
077import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
078import org.apache.hadoop.hbase.trace.TraceUtil;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.CommonFSUtils;
081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
082import org.apache.hadoop.hbase.util.Pair;
083import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
084import org.apache.hadoop.hbase.wal.WAL;
085import org.apache.hadoop.hbase.wal.WALEdit;
086import org.apache.hadoop.hbase.wal.WALFactory;
087import org.apache.hadoop.hbase.wal.WALKeyImpl;
088import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
089import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
090import org.apache.hadoop.hbase.wal.WALSplitter;
091import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
092import org.apache.hadoop.util.StringUtils;
093import org.apache.yetus.audience.InterfaceAudience;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
098
099/**
100 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one
101 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.
102 * This is done internal to the implementation.
103 * <p>
104 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a
105 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.
106 * A bunch of work in the below is done keeping account of these region sequence ids -- what is
107 * flushed out to hfiles, and what is yet in WAL and in memory only.
108 * <p>
109 * It is only practical to delete entire files. Thus, we delete an entire on-disk file
110 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
111 * (smaller) than the most-recent flush.
112 * <p>
113 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
114 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
115 * replication where we may want to tail the active WAL file.
116 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
117 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If
118 * we have made successful appends to the WAL and we then are unable to sync them, our current
119 * semantic is to return error to the client that the appends failed but also to abort the current
120 * context, usually the hosting server. We need to replay the WALs. <br>
121 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client
122 * that the append failed. <br>
123 * TODO: replication may pick up these last edits though they have been marked as failed append
124 * (Need to keep our own file lengths, not rely on HDFS).
125 */
126@InterfaceAudience.Private
127public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
128
129  private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
130
131  protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";
132  protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
133  protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";
134  protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms
135  protected static final String SLOW_SYNC_ROLL_THRESHOLD =
136    "hbase.regionserver.wal.slowsync.roll.threshold";
137  protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings
138  protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =
139    "hbase.regionserver.wal.slowsync.roll.interval.ms";
140  protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute
141
142  public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
143  protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
144
145  public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
146
147  public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
148
149  public static final String RING_BUFFER_SLOT_COUNT =
150    "hbase.regionserver.wal.disruptor.event.count";
151
152  public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";
153  public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;
154
155  public static final String WAL_AVOID_LOCAL_WRITES_KEY =
156    "hbase.regionserver.wal.avoid-local-writes";
157  public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;
158
159  /**
160   * file system instance
161   */
162  protected final FileSystem fs;
163
164  /**
165   * WAL directory, where all WAL files would be placed.
166   */
167  protected final Path walDir;
168
169  /**
170   * dir path where old logs are kept.
171   */
172  protected final Path walArchiveDir;
173
174  /**
175   * Matches just those wal files that belong to this wal instance.
176   */
177  protected final PathFilter ourFiles;
178
179  /**
180   * Prefix of a WAL file, usually the region server name it is hosted on.
181   */
182  protected final String walFilePrefix;
183
184  /**
185   * Suffix included on generated wal file names
186   */
187  protected final String walFileSuffix;
188
189  /**
190   * Prefix used when checking for wal membership.
191   */
192  protected final String prefixPathStr;
193
194  protected final WALCoprocessorHost coprocessorHost;
195
196  /**
197   * conf object
198   */
199  protected final Configuration conf;
200
201  protected final Abortable abortable;
202
203  /** Listeners that are called on WAL events. */
204  protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();
205
206  /** Tracks the logs in the process of being closed. */
207  protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();
208
209  /**
210   * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence
211   * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has
212   * facility for answering questions such as "Is it safe to GC a WAL?".
213   */
214  protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
215
216  protected final long slowSyncNs, rollOnSyncNs;
217  protected final int slowSyncRollThreshold;
218  protected final int slowSyncCheckInterval;
219  protected final AtomicInteger slowSyncCount = new AtomicInteger();
220
221  private final long walSyncTimeoutNs;
222
223  // If > than this size, roll the log.
224  protected final long logrollsize;
225
226  /**
227   * Block size to use writing files.
228   */
229  protected final long blocksize;
230
231  /*
232   * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If
233   * too many and we crash, then will take forever replaying. Keep the number of logs tidy.
234   */
235  protected final int maxLogs;
236
237  protected final boolean useHsync;
238
239  /**
240   * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock
241   * is held. We don't just use synchronized because that results in bogus and tedious findbugs
242   * warning when it thinks synchronized controls writer thread safety. It is held when we are
243   * actually rolling the log. It is checked when we are looking to see if we should roll the log or
244   * not.
245   */
246  protected final ReentrantLock rollWriterLock = new ReentrantLock(true);
247
248  // The timestamp (in ms) when the log file was created.
249  protected final AtomicLong filenum = new AtomicLong(-1);
250
251  // Number of transactions in the current Wal.
252  protected final AtomicInteger numEntries = new AtomicInteger(0);
253
254  /**
255   * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass
256   * WALEdit to background consumer thread, and the transaction id is the sequence number of the
257   * corresponding entry in queue.
258   */
259  protected volatile long highestUnsyncedTxid = -1;
260
261  /**
262   * Updated to the transaction id of the last successful sync call. This can be less than
263   * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in
264   * for it.
265   */
266  protected final AtomicLong highestSyncedTxid = new AtomicLong(0);
267
268  /**
269   * The total size of wal
270   */
271  protected final AtomicLong totalLogSize = new AtomicLong(0);
272  /**
273   * Current log file.
274   */
275  volatile W writer;
276
277  // Last time to check low replication on hlog's pipeline
278  private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();
279
280  // Last time we asked to roll the log due to a slow sync
281  private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
282
283  protected volatile boolean closed = false;
284
285  protected final AtomicBoolean shutdown = new AtomicBoolean(false);
286
287  protected final long walShutdownTimeout;
288
289  /**
290   * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
291   * an IllegalArgumentException if used to compare paths from different wals.
292   */
293  final Comparator<Path> LOG_NAME_COMPARATOR =
294    (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));
295
296  private static final class WALProps {
297
298    /**
299     * Map the encoded region name to the highest sequence id.
300     * <p/>
301     * Contains all the regions it has an entry for.
302     */
303    private final Map<byte[], Long> encodedName2HighestSequenceId;
304
305    /**
306     * The log file size. Notice that the size may not be accurate if we do asynchronous close in
307     * subclasses.
308     */
309    private final long logSize;
310
311    /**
312     * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the
313     * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
314     * for safety.
315     */
316    private volatile boolean closed = false;
317
318    WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
319      this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
320      this.logSize = logSize;
321    }
322  }
323
324  /**
325   * Map of WAL log file to properties. The map is sorted by the log file creation timestamp
326   * (contained in the log file name).
327   */
328  protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
329    new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
330
331  /**
332   * A cache of sync futures reused by threads.
333   */
334  protected final SyncFutureCache syncFutureCache;
335
336  /**
337   * The class name of the runtime implementation, used as prefix for logging/tracing.
338   * <p>
339   * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,
340   * refer to HBASE-17676 for more details
341   * </p>
342   */
343  protected final String implClassName;
344
345  protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
346
347  protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
348    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
349
350  // Run in caller if we get reject execution exception, to avoid aborting region server when we get
351  // reject execution exception. Usually this should not happen but let's make it more robust.
352  private final ExecutorService logArchiveExecutor =
353    new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),
354      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
355      new ThreadPoolExecutor.CallerRunsPolicy());
356
357  private final int archiveRetries;
358
359  public long getFilenum() {
360    return this.filenum.get();
361  }
362
363  /**
364   * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper
365   * method returns the creation timestamp from a given log file. It extracts the timestamp assuming
366   * the filename is created with the {@link #computeFilename(long filenum)} method.
367   * @return timestamp, as in the log file name.
368   */
369  protected long getFileNumFromFileName(Path fileName) {
370    checkNotNull(fileName, "file name can't be null");
371    if (!ourFiles.accept(fileName)) {
372      throw new IllegalArgumentException(
373        "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");
374    }
375    final String fileNameString = fileName.toString();
376    String chompedPath = fileNameString.substring(prefixPathStr.length(),
377      (fileNameString.length() - walFileSuffix.length()));
378    return Long.parseLong(chompedPath);
379  }
380
381  private int calculateMaxLogFiles(Configuration conf, long logRollSize) {
382    checkArgument(logRollSize > 0,
383      "The log roll size cannot be zero or negative when calculating max log files, "
384        + "current value is " + logRollSize);
385    Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);
386    return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);
387  }
388
389  // must be power of 2
390  protected final int getPreallocatedEventCount() {
391    // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will
392    // be stuck and make no progress if the buffer is filled with appends only and there is no
393    // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
394    // before they return.
395    int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
396    checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
397    int floor = Integer.highestOneBit(preallocatedEventCount);
398    if (floor == preallocatedEventCount) {
399      return floor;
400    }
401    // max capacity is 1 << 30
402    if (floor >= 1 << 29) {
403      return 1 << 30;
404    }
405    return floor << 1;
406  }
407
408  protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir,
409    final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
410    final boolean failIfWALExists, final String prefix, final String suffix)
411    throws FailedLogCloseException, IOException {
412    this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
413  }
414
415  protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,
416    final String logDir, final String archiveDir, final Configuration conf,
417    final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,
418    final String suffix) throws FailedLogCloseException, IOException {
419    this.fs = fs;
420    this.walDir = new Path(rootDir, logDir);
421    this.walArchiveDir = new Path(rootDir, archiveDir);
422    this.conf = conf;
423    this.abortable = abortable;
424
425    if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {
426      throw new IOException("Unable to mkdir " + walDir);
427    }
428
429    if (!fs.exists(this.walArchiveDir)) {
430      if (!fs.mkdirs(this.walArchiveDir)) {
431        throw new IOException("Unable to mkdir " + this.walArchiveDir);
432      }
433    }
434
435    // If prefix is null||empty then just name it wal
436    this.walFilePrefix =
437      prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
438    // we only correctly differentiate suffices when numeric ones start with '.'
439    if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
440      throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER
441        + "' but instead was '" + suffix + "'");
442    }
443    // Now that it exists, set the storage policy for the entire directory of wal files related to
444    // this FSHLog instance
445    String storagePolicy =
446      conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);
447    CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);
448    this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
449    this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
450
451    this.ourFiles = new PathFilter() {
452      @Override
453      public boolean accept(final Path fileName) {
454        // The path should start with dir/<prefix> and end with our suffix
455        final String fileNameString = fileName.toString();
456        if (!fileNameString.startsWith(prefixPathStr)) {
457          return false;
458        }
459        if (walFileSuffix.isEmpty()) {
460          // in the case of the null suffix, we need to ensure the filename ends with a timestamp.
461          return org.apache.commons.lang3.StringUtils
462            .isNumeric(fileNameString.substring(prefixPathStr.length()));
463        } else if (!fileNameString.endsWith(walFileSuffix)) {
464          return false;
465        }
466        return true;
467      }
468    };
469
470    if (failIfWALExists) {
471      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
472      if (null != walFiles && 0 != walFiles.length) {
473        throw new IOException("Target WAL already exists within directory " + walDir);
474      }
475    }
476
477    // Register listeners. TODO: Should this exist anymore? We have CPs?
478    if (listeners != null) {
479      for (WALActionsListener i : listeners) {
480        registerWALActionsListener(i);
481      }
482    }
483    this.coprocessorHost = new WALCoprocessorHost(this, conf);
484
485    // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block
486    // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost
487    // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of
488    // the block size but experience from the field has it that this was not enough time for the
489    // roll to happen before end-of-block. So the new accounting makes WALs of about the same
490    // size as those made in hbase-1 (to prevent surprise), we now have default block size as
491    // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
492    // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
493    this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
494    float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
495    this.logrollsize = (long) (this.blocksize * multiplier);
496    this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
497
498    LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="
499      + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="
500      + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir
501      + ", maxLogs=" + this.maxLogs);
502    this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS,
503      conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)));
504    this.rollOnSyncNs = TimeUnit.MILLISECONDS
505      .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));
506    this.slowSyncRollThreshold =
507      conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);
508    this.slowSyncCheckInterval =
509      conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
510    this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
511      conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)));
512    this.syncFutureCache = new SyncFutureCache(conf);
513    this.implClassName = getClass().getSimpleName();
514    this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
515    archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);
516    this.walShutdownTimeout =
517      conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);
518  }
519
520  /**
521   * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.
522   */
523  public void init() throws IOException {
524    rollWriter();
525  }
526
527  @Override
528  public void registerWALActionsListener(WALActionsListener listener) {
529    this.listeners.add(listener);
530  }
531
532  @Override
533  public boolean unregisterWALActionsListener(WALActionsListener listener) {
534    return this.listeners.remove(listener);
535  }
536
537  @Override
538  public WALCoprocessorHost getCoprocessorHost() {
539    return coprocessorHost;
540  }
541
542  @Override
543  public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
544    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
545  }
546
547  @Override
548  public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
549    return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
550  }
551
552  @Override
553  public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
554    this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
555  }
556
557  @Override
558  public void abortCacheFlush(byte[] encodedRegionName) {
559    this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
560  }
561
562  @Override
563  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
564    // Used by tests. Deprecated as too subtle for general usage.
565    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
566  }
567
568  @Override
569  public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
570    // This method is used by tests and for figuring if we should flush or not because our
571    // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
572    // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
573    // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
574    // currently flushing sequence ids, and if anything found there, it is returning these. This is
575    // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
576    // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
577    // id is old even though we are currently flushing. This may mean we do too much flushing.
578    return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
579  }
580
581  @Override
582  public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {
583    return rollWriter(false);
584  }
585
586  @Override
587  public final void sync() throws IOException {
588    sync(useHsync);
589  }
590
591  @Override
592  public final void sync(long txid) throws IOException {
593    sync(txid, useHsync);
594  }
595
596  @Override
597  public final void sync(boolean forceSync) throws IOException {
598    TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
599  }
600
601  @Override
602  public final void sync(long txid, boolean forceSync) throws IOException {
603    TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
604  }
605
606  protected abstract void doSync(boolean forceSync) throws IOException;
607
608  protected abstract void doSync(long txid, boolean forceSync) throws IOException;
609
610  /**
611   * This is a convenience method that computes a new filename with a given file-number.
612   * @param filenum to use
613   */
614  protected Path computeFilename(final long filenum) {
615    if (filenum < 0) {
616      throw new RuntimeException("WAL file number can't be < 0");
617    }
618    String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;
619    return new Path(walDir, child);
620  }
621
622  /**
623   * This is a convenience method that computes a new filename with a given using the current WAL
624   * file-number
625   */
626  public Path getCurrentFileName() {
627    return computeFilename(this.filenum.get());
628  }
629
630  /**
631   * retrieve the next path to use for writing. Increments the internal filenum.
632   */
633  private Path getNewPath() throws IOException {
634    this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));
635    Path newPath = getCurrentFileName();
636    return newPath;
637  }
638
639  public Path getOldPath() {
640    long currentFilenum = this.filenum.get();
641    Path oldPath = null;
642    if (currentFilenum > 0) {
643      // ComputeFilename will take care of meta wal filename
644      oldPath = computeFilename(currentFilenum);
645    } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?
646    return oldPath;
647  }
648
649  /**
650   * Tell listeners about pre log roll.
651   */
652  private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
653    throws IOException {
654    coprocessorHost.preWALRoll(oldPath, newPath);
655
656    if (!this.listeners.isEmpty()) {
657      for (WALActionsListener i : this.listeners) {
658        i.preLogRoll(oldPath, newPath);
659      }
660    }
661  }
662
663  /**
664   * Tell listeners about post log roll.
665   */
666  private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
667    throws IOException {
668    if (!this.listeners.isEmpty()) {
669      for (WALActionsListener i : this.listeners) {
670        i.postLogRoll(oldPath, newPath);
671      }
672    }
673
674    coprocessorHost.postWALRoll(oldPath, newPath);
675  }
676
677  // public only until class moves to o.a.h.h.wal
678  /** Returns the number of rolled log files */
679  public int getNumRolledLogFiles() {
680    return walFile2Props.size();
681  }
682
683  // public only until class moves to o.a.h.h.wal
684  /** Returns the number of log files in use */
685  public int getNumLogFiles() {
686    // +1 for current use log
687    return getNumRolledLogFiles() + 1;
688  }
689
690  /**
691   * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the
692   * first (oldest) WAL, and return those regions which should be flushed so that it can be
693   * let-go/'archived'.
694   * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file
695   */
696  Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
697    Map<byte[], List<byte[]>> regions = null;
698    int logCount = getNumRolledLogFiles();
699    if (logCount > this.maxLogs && logCount > 0) {
700      Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
701      regions =
702        this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
703    }
704    if (regions != null) {
705      List<String> listForPrint = new ArrayList<>();
706      for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {
707        StringBuilder families = new StringBuilder();
708        for (int i = 0; i < r.getValue().size(); i++) {
709          if (i > 0) {
710            families.append(",");
711          }
712          families.append(Bytes.toString(r.getValue().get(i)));
713        }
714        listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");
715      }
716      LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs
717        + "; forcing (partial) flush of " + regions.size() + " region(s): "
718        + StringUtils.join(",", listForPrint));
719    }
720    return regions;
721  }
722
723  /**
724   * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
725   */
726  protected final void markClosedAndClean(Path path) {
727    WALProps props = walFile2Props.get(path);
728    // typically this should not be null, but if there is no big issue if it is already null, so
729    // let's make the code more robust
730    if (props != null) {
731      props.closed = true;
732      cleanOldLogs();
733    }
734  }
735
736  /**
737   * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
738   * <p/>
739   * Use synchronized because we may call this method in different threads, normally when replacing
740   * writer, and since now close writer may be asynchronous, we will also call this method in the
741   * closeExecutor, right after we actually close a WAL writer.
742   */
743  private synchronized void cleanOldLogs() {
744    List<Pair<Path, Long>> logsToArchive = null;
745    // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids
746    // are older than what is currently in memory, the WAL can be GC'd.
747    for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
748      if (!e.getValue().closed) {
749        LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
750        continue;
751      }
752      Path log = e.getKey();
753      Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
754      if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
755        if (logsToArchive == null) {
756          logsToArchive = new ArrayList<>();
757        }
758        logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
759        if (LOG.isTraceEnabled()) {
760          LOG.trace("WAL file ready for archiving " + log);
761        }
762      }
763    }
764
765    if (logsToArchive != null) {
766      final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
767      // make it async
768      for (Pair<Path, Long> log : localLogsToArchive) {
769        logArchiveExecutor.execute(() -> {
770          archive(log);
771        });
772        this.walFile2Props.remove(log.getFirst());
773      }
774    }
775  }
776
777  protected void archive(final Pair<Path, Long> log) {
778    totalLogSize.addAndGet(-log.getSecond());
779    int retry = 1;
780    while (true) {
781      try {
782        archiveLogFile(log.getFirst());
783        // successful
784        break;
785      } catch (Throwable e) {
786        if (retry > archiveRetries) {
787          LOG.error("Failed log archiving for the log {},", log.getFirst(), e);
788          if (this.abortable != null) {
789            this.abortable.abort("Failed log archiving", e);
790            break;
791          }
792        } else {
793          LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);
794        }
795        retry++;
796      }
797    }
798  }
799
800  /*
801   * only public so WALSplitter can use.
802   * @return archived location of a WAL file with the given path p
803   */
804  public static Path getWALArchivePath(Path archiveDir, Path p) {
805    return new Path(archiveDir, p.getName());
806  }
807
808  protected void archiveLogFile(final Path p) throws IOException {
809    Path newPath = getWALArchivePath(this.walArchiveDir, p);
810    // Tell our listeners that a log is going to be archived.
811    if (!this.listeners.isEmpty()) {
812      for (WALActionsListener i : this.listeners) {
813        i.preLogArchive(p, newPath);
814      }
815    }
816    LOG.info("Archiving " + p + " to " + newPath);
817    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
818      throw new IOException("Unable to rename " + p + " to " + newPath);
819    }
820    // Tell our listeners that a log has been archived.
821    if (!this.listeners.isEmpty()) {
822      for (WALActionsListener i : this.listeners) {
823        i.postLogArchive(p, newPath);
824      }
825    }
826  }
827
828  protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
829    int oldNumEntries = this.numEntries.getAndSet(0);
830    String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
831    if (oldPath != null) {
832      this.walFile2Props.put(oldPath,
833        new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
834      this.totalLogSize.addAndGet(oldFileLen);
835      LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
836        CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
837        newPathString);
838    } else {
839      LOG.info("New WAL {}", newPathString);
840    }
841  }
842
843  private Span createSpan(String name) {
844    return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);
845  }
846
847  /**
848   * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.
849   * <p/>
850   * <ul>
851   * <li>In the case of creating a new WAL, oldPath will be null.</li>
852   * <li>In the case of rolling over from one file to the next, none of the parameters will be null.
853   * </li>
854   * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
855   * null.</li>
856   * </ul>
857   * @param oldPath    may be null
858   * @param newPath    may be null
859   * @param nextWriter may be null
860   * @return the passed in <code>newPath</code>
861   * @throws IOException if there is a problem flushing or closing the underlying FS
862   */
863  Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
864    return TraceUtil.trace(() -> {
865      doReplaceWriter(oldPath, newPath, nextWriter);
866      return newPath;
867    }, () -> createSpan("WAL.replaceWriter"));
868  }
869
870  protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
871    // Now we have published the ringbuffer, halt the current thread until we get an answer back.
872    try {
873      if (syncFuture != null) {
874        if (closed) {
875          throw new IOException("WAL has been closed");
876        } else {
877          syncFuture.get(walSyncTimeoutNs);
878        }
879      }
880    } catch (TimeoutIOException tioe) {
881      throw new WALSyncTimeoutIOException(tioe);
882    } catch (InterruptedException ie) {
883      LOG.warn("Interrupted", ie);
884      throw convertInterruptedExceptionToIOException(ie);
885    } catch (ExecutionException e) {
886      throw ensureIOException(e.getCause());
887    }
888  }
889
890  private static IOException ensureIOException(final Throwable t) {
891    return (t instanceof IOException) ? (IOException) t : new IOException(t);
892  }
893
894  private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
895    Thread.currentThread().interrupt();
896    IOException ioe = new InterruptedIOException();
897    ioe.initCause(ie);
898    return ioe;
899  }
900
901  private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {
902    rollWriterLock.lock();
903    try {
904      // Return if nothing to flush.
905      if (!force && this.writer != null && this.numEntries.get() <= 0) {
906        return null;
907      }
908      Map<byte[], List<byte[]>> regionsToFlush = null;
909      if (this.closed) {
910        LOG.debug("WAL closed. Skipping rolling of writer");
911        return regionsToFlush;
912      }
913      try {
914        Path oldPath = getOldPath();
915        Path newPath = getNewPath();
916        // Any exception from here on is catastrophic, non-recoverable, so we currently abort.
917        W nextWriter = this.createWriterInstance(newPath);
918        tellListenersAboutPreLogRoll(oldPath, newPath);
919        // NewPath could be equal to oldPath if replaceWriter fails.
920        newPath = replaceWriter(oldPath, newPath, nextWriter);
921        tellListenersAboutPostLogRoll(oldPath, newPath);
922        if (LOG.isDebugEnabled()) {
923          LOG.debug("Create new " + implClassName + " writer with pipeline: "
924            + Arrays.toString(getPipeline()));
925        }
926        // We got a new writer, so reset the slow sync count
927        lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();
928        slowSyncCount.set(0);
929        // Can we delete any of the old log files?
930        if (getNumRolledLogFiles() > 0) {
931          cleanOldLogs();
932          regionsToFlush = findRegionsToForceFlush();
933        }
934      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
935        // If the underlying FileSystem can't do what we ask, treat as IO failure, so
936        // we'll abort.
937        throw new IOException(
938          "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",
939          exception);
940      }
941      return regionsToFlush;
942    } finally {
943      rollWriterLock.unlock();
944    }
945  }
946
947  @Override
948  public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {
949    return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));
950  }
951
952  // public only until class moves to o.a.h.h.wal
953  /** Returns the size of log files in use */
954  public long getLogFileSize() {
955    return this.totalLogSize.get();
956  }
957
958  // public only until class moves to o.a.h.h.wal
959  public void requestLogRoll() {
960    requestLogRoll(ERROR);
961  }
962
963  /**
964   * Get the backing files associated with this WAL.
965   * @return may be null if there are no files.
966   */
967  FileStatus[] getFiles() throws IOException {
968    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
969  }
970
971  @Override
972  public void shutdown() throws IOException {
973    if (!shutdown.compareAndSet(false, true)) {
974      return;
975    }
976    closed = true;
977    // Tell our listeners that the log is closing
978    if (!this.listeners.isEmpty()) {
979      for (WALActionsListener i : this.listeners) {
980        i.logCloseRequested();
981      }
982    }
983
984    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
985      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
986
987    Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
988      @Override
989      public Void call() throws Exception {
990        if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
991          try {
992            doShutdown();
993            if (syncFutureCache != null) {
994              syncFutureCache.clear();
995            }
996          } finally {
997            rollWriterLock.unlock();
998          }
999        } else {
1000          throw new IOException("Waiting for rollWriterLock timeout");
1001        }
1002        return null;
1003      }
1004    });
1005    shutdownExecutor.shutdown();
1006
1007    try {
1008      future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
1009    } catch (InterruptedException e) {
1010      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1011    } catch (TimeoutException e) {
1012      throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1013        + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1014        + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1015        + "\"", e);
1016    } catch (ExecutionException e) {
1017      if (e.getCause() instanceof IOException) {
1018        throw (IOException) e.getCause();
1019      } else {
1020        throw new IOException(e.getCause());
1021      }
1022    } finally {
1023      // in shutdown, we may call cleanOldLogs so shutdown this executor in the end.
1024      // In sync replication implementation, we may shut down a WAL without shutting down the whole
1025      // region server, if we shut down this executor earlier we may get reject execution exception
1026      logArchiveExecutor.shutdown();
1027    }
1028    // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
1029    // have some pending archiving tasks not finished yet, and in close we may archive all the
1030    // remaining WAL files, there could be race if we do not wait for the background archive task
1031    // finish
1032    try {
1033      if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
1034        throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
1035          + " the shutdown of WAL doesn't complete! Please check the status of underlying "
1036          + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
1037          + "\"");
1038      }
1039    } catch (InterruptedException e) {
1040      throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
1041    }
1042  }
1043
1044  @Override
1045  public void close() throws IOException {
1046    shutdown();
1047    final FileStatus[] files = getFiles();
1048    if (null != files && 0 != files.length) {
1049      for (FileStatus file : files) {
1050        Path p = getWALArchivePath(this.walArchiveDir, file.getPath());
1051        // Tell our listeners that a log is going to be archived.
1052        if (!this.listeners.isEmpty()) {
1053          for (WALActionsListener i : this.listeners) {
1054            i.preLogArchive(file.getPath(), p);
1055          }
1056        }
1057
1058        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1059          throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1060        }
1061        // Tell our listeners that a log was archived.
1062        if (!this.listeners.isEmpty()) {
1063          for (WALActionsListener i : this.listeners) {
1064            i.postLogArchive(file.getPath(), p);
1065          }
1066        }
1067      }
1068      LOG.debug(
1069        "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));
1070    }
1071    LOG.info("Closed WAL: " + toString());
1072  }
1073
1074  /** Returns number of WALs currently in the process of closing. */
1075  public int getInflightWALCloseCount() {
1076    return inflightWALClosures.size();
1077  }
1078
1079  /**
1080   * updates the sequence number of a specific store. depending on the flag: replaces current seq
1081   * number if the given seq id is bigger, or even if it is lower than existing one
1082   */
1083  @Override
1084  public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
1085    boolean onlyIfGreater) {
1086    sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);
1087  }
1088
1089  protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
1090    return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);
1091  }
1092
1093  protected boolean isLogRollRequested() {
1094    return rollRequested.get();
1095  }
1096
1097  protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {
1098    // If we have already requested a roll, don't do it again
1099    // And only set rollRequested to true when there is a registered listener
1100    if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {
1101      for (WALActionsListener i : this.listeners) {
1102        i.logRollRequested(reason);
1103      }
1104    }
1105  }
1106
1107  long getUnflushedEntriesCount() {
1108    long highestSynced = this.highestSyncedTxid.get();
1109    long highestUnsynced = this.highestUnsyncedTxid;
1110    return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;
1111  }
1112
1113  boolean isUnflushedEntries() {
1114    return getUnflushedEntriesCount() > 0;
1115  }
1116
1117  /**
1118   * Exposed for testing only. Use to tricks like halt the ring buffer appending.
1119   */
1120  protected void atHeadOfRingBufferEventHandlerAppend() {
1121    // Noop
1122  }
1123
1124  protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {
1125    // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
1126    atHeadOfRingBufferEventHandlerAppend();
1127    long start = EnvironmentEdgeManager.currentTime();
1128    byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
1129    long regionSequenceId = entry.getKey().getSequenceId();
1130
1131    // Edits are empty, there is nothing to append. Maybe empty when we are looking for a
1132    // region sequence id only, a region edit/sequence id that is not associated with an actual
1133    // edit. It has to go through all the rigmarole to be sure we have the right ordering.
1134    if (entry.getEdit().isEmpty()) {
1135      return false;
1136    }
1137
1138    // Coprocessor hook.
1139    coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1140    if (!listeners.isEmpty()) {
1141      for (WALActionsListener i : listeners) {
1142        i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1143      }
1144    }
1145    doAppend(writer, entry);
1146    assert highestUnsyncedTxid < entry.getTxid();
1147    highestUnsyncedTxid = entry.getTxid();
1148    if (entry.isCloseRegion()) {
1149      // let's clean all the records of this region
1150      sequenceIdAccounting.onRegionClose(encodedRegionName);
1151    } else {
1152      sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1153        entry.isInMemStore());
1154    }
1155    coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
1156    // Update metrics.
1157    postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1158    numEntries.incrementAndGet();
1159    return true;
1160  }
1161
1162  private long postAppend(final Entry e, final long elapsedTime) throws IOException {
1163    long len = 0;
1164    if (!listeners.isEmpty()) {
1165      for (Cell cell : e.getEdit().getCells()) {
1166        len += PrivateCellUtil.estimatedSerializedSizeOf(cell);
1167      }
1168      for (WALActionsListener listener : listeners) {
1169        listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());
1170      }
1171    }
1172    return len;
1173  }
1174
1175  protected final void postSync(final long timeInNanos, final int handlerSyncs) {
1176    if (timeInNanos > this.slowSyncNs) {
1177      String msg = new StringBuilder().append("Slow sync cost: ")
1178        .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")
1179        .append(Arrays.toString(getPipeline())).toString();
1180      LOG.info(msg);
1181      // A single sync took too long.
1182      // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative
1183      // effects. Here we have a single data point that indicates we should take immediate
1184      // action, so do so.
1185      if (timeInNanos > this.rollOnSyncNs) {
1186        LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="
1187          + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="
1188          + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "
1189          + Arrays.toString(getPipeline()));
1190        requestLogRoll(SLOW_SYNC);
1191      }
1192      slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this
1193    }
1194    if (!listeners.isEmpty()) {
1195      for (WALActionsListener listener : listeners) {
1196        listener.postSync(timeInNanos, handlerSyncs);
1197      }
1198    }
1199  }
1200
1201  protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1202    WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {
1203    if (this.closed) {
1204      throw new IOException(
1205        "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1206    }
1207    MutableLong txidHolder = new MutableLong();
1208    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
1209      txidHolder.setValue(ringBuffer.next());
1210    });
1211    long txid = txidHolder.longValue();
1212    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
1213      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
1214    try {
1215      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1216      entry.stampRegionSequenceId(we);
1217      ringBuffer.get(txid).load(entry);
1218    } finally {
1219      ringBuffer.publish(txid);
1220    }
1221    return txid;
1222  }
1223
1224  @Override
1225  public String toString() {
1226    return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";
1227  }
1228
1229  /**
1230   * if the given {@code path} is being written currently, then return its length.
1231   * <p>
1232   * This is used by replication to prevent replicating unacked log entries. See
1233   * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
1234   */
1235  @Override
1236  public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
1237    rollWriterLock.lock();
1238    try {
1239      Path currentPath = getOldPath();
1240      if (path.equals(currentPath)) {
1241        // Currently active path.
1242        W writer = this.writer;
1243        return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
1244      } else {
1245        W temp = inflightWALClosures.get(path.getName());
1246        if (temp != null) {
1247          // In the process of being closed, trailer bytes may or may not be flushed.
1248          // Ensuring that we read all the bytes in a file is critical for correctness of tailing
1249          // use cases like replication, see HBASE-25924/HBASE-25932.
1250          return OptionalLong.of(temp.getSyncedLength());
1251        }
1252        // Log rolled successfully.
1253        return OptionalLong.empty();
1254      }
1255    } finally {
1256      rollWriterLock.unlock();
1257    }
1258  }
1259
1260  @Override
1261  public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1262    return TraceUtil.trace(() -> append(info, key, edits, true),
1263      () -> createSpan("WAL.appendData"));
1264  }
1265
1266  @Override
1267  public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
1268    return TraceUtil.trace(() -> append(info, key, edits, false),
1269      () -> createSpan("WAL.appendMarker"));
1270  }
1271
1272  /**
1273   * Append a set of edits to the WAL.
1274   * <p/>
1275   * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
1276   * have its region edit/sequence id assigned else it messes up our unification of mvcc and
1277   * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.
1278   * <p/>
1279   * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc
1280   * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment
1281   * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must
1282   * 'complete' the transaction this mvcc transaction by calling
1283   * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it
1284   * in the finally of a try/finally block within which this appends lives and any subsequent
1285   * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the
1286   * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not
1287   * immediately available on return from this method. It WILL be available subsequent to a sync of
1288   * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
1289   * @param info       the regioninfo associated with append
1290   * @param key        Modified by this call; we add to it this edits region edit/sequence id.
1291   * @param edits      Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
1292   *                   sequence id that is after all currently appended edits.
1293   * @param inMemstore Always true except for case where we are writing a region event meta marker
1294   *                   edit, for example, a compaction completion record into the WAL or noting a
1295   *                   Region Open event. In these cases the entry is just so we can finish an
1296   *                   unfinished compaction after a crash when the new Server reads the WAL on
1297   *                   recovery, etc. These transition event 'Markers' do not go via the memstore.
1298   *                   When memstore is false, we presume a Marker event edit.
1299   * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
1300   *         in it.
1301   */
1302  protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1303    throws IOException;
1304
1305  protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
1306
1307  protected abstract W createWriterInstance(Path path)
1308    throws IOException, CommonFSUtils.StreamLacksCapabilityException;
1309
1310  /**
1311   * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer
1312   * will begin to work before returning from this method. If we clear the flag after returning from
1313   * this call, we may miss a roll request. The implementation class should choose a proper place to
1314   * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you
1315   * start writing to the new writer.
1316   */
1317  protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
1318    throws IOException;
1319
1320  protected abstract void doShutdown() throws IOException;
1321
1322  protected abstract boolean doCheckLogLowReplication();
1323
1324  /** Returns true if we exceeded the slow sync roll threshold over the last check interval */
1325  protected boolean doCheckSlowSync() {
1326    boolean result = false;
1327    long now = EnvironmentEdgeManager.currentTime();
1328    long elapsedTime = now - lastTimeCheckSlowSync;
1329    if (elapsedTime >= slowSyncCheckInterval) {
1330      if (slowSyncCount.get() >= slowSyncRollThreshold) {
1331        if (elapsedTime >= (2 * slowSyncCheckInterval)) {
1332          // If two or more slowSyncCheckInterval have elapsed this is a corner case
1333          // where a train of slow syncs almost triggered us but then there was a long
1334          // interval from then until the one more that pushed us over. If so, we
1335          // should do nothing and let the count reset.
1336          if (LOG.isDebugEnabled()) {
1337            LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="
1338              + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="
1339              + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");
1340          }
1341          // Fall through to count reset below
1342        } else {
1343          LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="
1344            + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "
1345            + Arrays.toString(getPipeline()));
1346          result = true;
1347        }
1348      }
1349      lastTimeCheckSlowSync = now;
1350      slowSyncCount.set(0);
1351    }
1352    return result;
1353  }
1354
1355  public void checkLogLowReplication(long checkInterval) {
1356    long now = EnvironmentEdgeManager.currentTime();
1357    if (now - lastTimeCheckLowReplication < checkInterval) {
1358      return;
1359    }
1360    // Will return immediately if we are in the middle of a WAL log roll currently.
1361    if (!rollWriterLock.tryLock()) {
1362      return;
1363    }
1364    try {
1365      lastTimeCheckLowReplication = now;
1366      if (doCheckLogLowReplication()) {
1367        requestLogRoll(LOW_REPLICATION);
1368      }
1369    } finally {
1370      rollWriterLock.unlock();
1371    }
1372  }
1373
1374  /**
1375   * This method gets the pipeline for the current WAL.
1376   */
1377  abstract DatanodeInfo[] getPipeline();
1378
1379  /**
1380   * This method gets the datanode replication count for the current WAL.
1381   */
1382  abstract int getLogReplication();
1383
1384  private static void split(final Configuration conf, final Path p) throws IOException {
1385    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
1386    if (!fs.exists(p)) {
1387      throw new FileNotFoundException(p.toString());
1388    }
1389    if (!fs.getFileStatus(p).isDirectory()) {
1390      throw new IOException(p + " is not a directory");
1391    }
1392
1393    final Path baseDir = CommonFSUtils.getWALRootDir(conf);
1394    Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1395    if (
1396      conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
1397        AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)
1398    ) {
1399      archiveDir = new Path(archiveDir, p.getName());
1400    }
1401    WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1402  }
1403
1404  W getWriter() {
1405    return this.writer;
1406  }
1407
1408  private static void usage() {
1409    System.err.println("Usage: AbstractFSWAL <ARGS>");
1410    System.err.println("Arguments:");
1411    System.err.println(" --dump  Dump textual representation of passed one or more files");
1412    System.err.println("         For example: "
1413      + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
1414    System.err.println(" --split Split the passed directory of WAL logs");
1415    System.err.println(
1416      "         For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");
1417  }
1418
1419  /**
1420   * Pass one or more log file names, and it will either dump out a text version on
1421   * <code>stdout</code> or split the specified log files.
1422   */
1423  public static void main(String[] args) throws IOException {
1424    if (args.length < 2) {
1425      usage();
1426      System.exit(-1);
1427    }
1428    // either dump using the WALPrettyPrinter or split, depending on args
1429    if (args[0].compareTo("--dump") == 0) {
1430      WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1431    } else if (args[0].compareTo("--perf") == 0) {
1432      LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");
1433      LOG.error(HBaseMarkers.FATAL,
1434        "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);
1435      System.exit(-1);
1436    } else if (args[0].compareTo("--split") == 0) {
1437      Configuration conf = HBaseConfiguration.create();
1438      for (int i = 1; i < args.length; i++) {
1439        try {
1440          Path logPath = new Path(args[i]);
1441          CommonFSUtils.setFsDefault(conf, logPath);
1442          split(conf, logPath);
1443        } catch (IOException t) {
1444          t.printStackTrace(System.err);
1445          System.exit(-1);
1446        }
1447      }
1448    } else {
1449      usage();
1450      System.exit(-1);
1451    }
1452  }
1453}