001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION; 022import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC; 023import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL; 024import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; 025import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; 026import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; 027 028import com.lmax.disruptor.RingBuffer; 029import io.opentelemetry.api.trace.Span; 030import java.io.FileNotFoundException; 031import java.io.IOException; 032import java.io.InterruptedIOException; 033import java.lang.management.MemoryType; 034import java.net.URLEncoder; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Comparator; 038import java.util.List; 039import java.util.Map; 040import java.util.OptionalLong; 041import java.util.Set; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentNavigableMap; 045import java.util.concurrent.ConcurrentSkipListMap; 046import java.util.concurrent.CopyOnWriteArrayList; 047import java.util.concurrent.ExecutionException; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.Executors; 050import java.util.concurrent.Future; 051import java.util.concurrent.LinkedBlockingQueue; 052import java.util.concurrent.ThreadPoolExecutor; 053import java.util.concurrent.TimeUnit; 054import java.util.concurrent.TimeoutException; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.concurrent.atomic.AtomicInteger; 057import java.util.concurrent.atomic.AtomicLong; 058import java.util.concurrent.locks.ReentrantLock; 059import org.apache.commons.lang3.mutable.MutableLong; 060import org.apache.hadoop.conf.Configuration; 061import org.apache.hadoop.fs.FileStatus; 062import org.apache.hadoop.fs.FileSystem; 063import org.apache.hadoop.fs.Path; 064import org.apache.hadoop.fs.PathFilter; 065import org.apache.hadoop.hbase.Abortable; 066import org.apache.hadoop.hbase.Cell; 067import org.apache.hadoop.hbase.HBaseConfiguration; 068import org.apache.hadoop.hbase.HConstants; 069import org.apache.hadoop.hbase.PrivateCellUtil; 070import org.apache.hadoop.hbase.client.RegionInfo; 071import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 072import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 073import org.apache.hadoop.hbase.ipc.RpcServer; 074import org.apache.hadoop.hbase.ipc.ServerCall; 075import org.apache.hadoop.hbase.log.HBaseMarkers; 076import org.apache.hadoop.hbase.regionserver.HRegion; 077import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 078import org.apache.hadoop.hbase.trace.TraceUtil; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.CommonFSUtils; 081import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 082import org.apache.hadoop.hbase.util.Pair; 083import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 084import org.apache.hadoop.hbase.wal.WAL; 085import org.apache.hadoop.hbase.wal.WALEdit; 086import org.apache.hadoop.hbase.wal.WALFactory; 087import org.apache.hadoop.hbase.wal.WALKeyImpl; 088import org.apache.hadoop.hbase.wal.WALPrettyPrinter; 089import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; 090import org.apache.hadoop.hbase.wal.WALSplitter; 091import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 092import org.apache.hadoop.util.StringUtils; 093import org.apache.yetus.audience.InterfaceAudience; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 098 099/** 100 * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one 101 * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. 102 * This is done internal to the implementation. 103 * <p> 104 * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a 105 * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. 106 * A bunch of work in the below is done keeping account of these region sequence ids -- what is 107 * flushed out to hfiles, and what is yet in WAL and in memory only. 108 * <p> 109 * It is only practical to delete entire files. Thus, we delete an entire on-disk file 110 * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older 111 * (smaller) than the most-recent flush. 112 * <p> 113 * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read, 114 * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for 115 * replication where we may want to tail the active WAL file. 116 * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL 117 * is now a lame duck; any more appends or syncs will fail also with the same original exception. If 118 * we have made successful appends to the WAL and we then are unable to sync them, our current 119 * semantic is to return error to the client that the appends failed but also to abort the current 120 * context, usually the hosting server. We need to replay the WALs. <br> 121 * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client 122 * that the append failed. <br> 123 * TODO: replication may pick up these last edits though they have been marked as failed append 124 * (Need to keep our own file lengths, not rely on HDFS). 125 */ 126@InterfaceAudience.Private 127public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { 128 129 private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); 130 131 protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms"; 132 protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms 133 protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms"; 134 protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms 135 protected static final String SLOW_SYNC_ROLL_THRESHOLD = 136 "hbase.regionserver.wal.slowsync.roll.threshold"; 137 protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings 138 protected static final String SLOW_SYNC_ROLL_INTERVAL_MS = 139 "hbase.regionserver.wal.slowsync.roll.interval.ms"; 140 protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute 141 142 public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; 143 protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min 144 145 public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier"; 146 147 public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; 148 149 public static final String RING_BUFFER_SLOT_COUNT = 150 "hbase.regionserver.wal.disruptor.event.count"; 151 152 public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms"; 153 public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000; 154 155 public static final String WAL_AVOID_LOCAL_WRITES_KEY = 156 "hbase.regionserver.wal.avoid-local-writes"; 157 public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; 158 159 /** 160 * file system instance 161 */ 162 protected final FileSystem fs; 163 164 /** 165 * WAL directory, where all WAL files would be placed. 166 */ 167 protected final Path walDir; 168 169 /** 170 * dir path where old logs are kept. 171 */ 172 protected final Path walArchiveDir; 173 174 /** 175 * Matches just those wal files that belong to this wal instance. 176 */ 177 protected final PathFilter ourFiles; 178 179 /** 180 * Prefix of a WAL file, usually the region server name it is hosted on. 181 */ 182 protected final String walFilePrefix; 183 184 /** 185 * Suffix included on generated wal file names 186 */ 187 protected final String walFileSuffix; 188 189 /** 190 * Prefix used when checking for wal membership. 191 */ 192 protected final String prefixPathStr; 193 194 protected final WALCoprocessorHost coprocessorHost; 195 196 /** 197 * conf object 198 */ 199 protected final Configuration conf; 200 201 protected final Abortable abortable; 202 203 /** Listeners that are called on WAL events. */ 204 protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); 205 206 /** Tracks the logs in the process of being closed. */ 207 protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>(); 208 209 /** 210 * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence 211 * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has 212 * facility for answering questions such as "Is it safe to GC a WAL?". 213 */ 214 protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); 215 216 protected final long slowSyncNs, rollOnSyncNs; 217 protected final int slowSyncRollThreshold; 218 protected final int slowSyncCheckInterval; 219 protected final AtomicInteger slowSyncCount = new AtomicInteger(); 220 221 private final long walSyncTimeoutNs; 222 223 // If > than this size, roll the log. 224 protected final long logrollsize; 225 226 /** 227 * Block size to use writing files. 228 */ 229 protected final long blocksize; 230 231 /* 232 * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If 233 * too many and we crash, then will take forever replaying. Keep the number of logs tidy. 234 */ 235 protected final int maxLogs; 236 237 protected final boolean useHsync; 238 239 /** 240 * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock 241 * is held. We don't just use synchronized because that results in bogus and tedious findbugs 242 * warning when it thinks synchronized controls writer thread safety. It is held when we are 243 * actually rolling the log. It is checked when we are looking to see if we should roll the log or 244 * not. 245 */ 246 protected final ReentrantLock rollWriterLock = new ReentrantLock(true); 247 248 // The timestamp (in ms) when the log file was created. 249 protected final AtomicLong filenum = new AtomicLong(-1); 250 251 // Number of transactions in the current Wal. 252 protected final AtomicInteger numEntries = new AtomicInteger(0); 253 254 /** 255 * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass 256 * WALEdit to background consumer thread, and the transaction id is the sequence number of the 257 * corresponding entry in queue. 258 */ 259 protected volatile long highestUnsyncedTxid = -1; 260 261 /** 262 * Updated to the transaction id of the last successful sync call. This can be less than 263 * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in 264 * for it. 265 */ 266 protected final AtomicLong highestSyncedTxid = new AtomicLong(0); 267 268 /** 269 * The total size of wal 270 */ 271 protected final AtomicLong totalLogSize = new AtomicLong(0); 272 /** 273 * Current log file. 274 */ 275 volatile W writer; 276 277 // Last time to check low replication on hlog's pipeline 278 private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); 279 280 // Last time we asked to roll the log due to a slow sync 281 private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 282 283 protected volatile boolean closed = false; 284 285 protected final AtomicBoolean shutdown = new AtomicBoolean(false); 286 287 protected final long walShutdownTimeout; 288 289 /** 290 * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws 291 * an IllegalArgumentException if used to compare paths from different wals. 292 */ 293 final Comparator<Path> LOG_NAME_COMPARATOR = 294 (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); 295 296 private static final class WALProps { 297 298 /** 299 * Map the encoded region name to the highest sequence id. 300 * <p/> 301 * Contains all the regions it has an entry for. 302 */ 303 private final Map<byte[], Long> encodedName2HighestSequenceId; 304 305 /** 306 * The log file size. Notice that the size may not be accurate if we do asynchronous close in 307 * subclasses. 308 */ 309 private final long logSize; 310 311 /** 312 * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the 313 * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file, 314 * for safety. 315 */ 316 private volatile boolean closed = false; 317 318 WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) { 319 this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; 320 this.logSize = logSize; 321 } 322 } 323 324 /** 325 * Map of WAL log file to properties. The map is sorted by the log file creation timestamp 326 * (contained in the log file name). 327 */ 328 protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props = 329 new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); 330 331 /** 332 * A cache of sync futures reused by threads. 333 */ 334 protected final SyncFutureCache syncFutureCache; 335 336 /** 337 * The class name of the runtime implementation, used as prefix for logging/tracing. 338 * <p> 339 * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, 340 * refer to HBASE-17676 for more details 341 * </p> 342 */ 343 protected final String implClassName; 344 345 protected final AtomicBoolean rollRequested = new AtomicBoolean(false); 346 347 protected final ExecutorService closeExecutor = Executors.newCachedThreadPool( 348 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); 349 350 // Run in caller if we get reject execution exception, to avoid aborting region server when we get 351 // reject execution exception. Usually this should not happen but let's make it more robust. 352 private final ExecutorService logArchiveExecutor = 353 new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), 354 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(), 355 new ThreadPoolExecutor.CallerRunsPolicy()); 356 357 private final int archiveRetries; 358 359 public long getFilenum() { 360 return this.filenum.get(); 361 } 362 363 /** 364 * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper 365 * method returns the creation timestamp from a given log file. It extracts the timestamp assuming 366 * the filename is created with the {@link #computeFilename(long filenum)} method. 367 * @return timestamp, as in the log file name. 368 */ 369 protected long getFileNumFromFileName(Path fileName) { 370 checkNotNull(fileName, "file name can't be null"); 371 if (!ourFiles.accept(fileName)) { 372 throw new IllegalArgumentException( 373 "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); 374 } 375 final String fileNameString = fileName.toString(); 376 String chompedPath = fileNameString.substring(prefixPathStr.length(), 377 (fileNameString.length() - walFileSuffix.length())); 378 return Long.parseLong(chompedPath); 379 } 380 381 private int calculateMaxLogFiles(Configuration conf, long logRollSize) { 382 checkArgument(logRollSize > 0, 383 "The log roll size cannot be zero or negative when calculating max log files, " 384 + "current value is " + logRollSize); 385 Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); 386 return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); 387 } 388 389 // must be power of 2 390 protected final int getPreallocatedEventCount() { 391 // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will 392 // be stuck and make no progress if the buffer is filled with appends only and there is no 393 // sync. If no sync, then the handlers will be outstanding just waiting on sync completion 394 // before they return. 395 int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16); 396 checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0"); 397 int floor = Integer.highestOneBit(preallocatedEventCount); 398 if (floor == preallocatedEventCount) { 399 return floor; 400 } 401 // max capacity is 1 << 30 402 if (floor >= 1 << 29) { 403 return 1 << 30; 404 } 405 return floor << 1; 406 } 407 408 protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, 409 final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, 410 final boolean failIfWALExists, final String prefix, final String suffix) 411 throws FailedLogCloseException, IOException { 412 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); 413 } 414 415 protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, 416 final String logDir, final String archiveDir, final Configuration conf, 417 final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, 418 final String suffix) throws FailedLogCloseException, IOException { 419 this.fs = fs; 420 this.walDir = new Path(rootDir, logDir); 421 this.walArchiveDir = new Path(rootDir, archiveDir); 422 this.conf = conf; 423 this.abortable = abortable; 424 425 if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { 426 throw new IOException("Unable to mkdir " + walDir); 427 } 428 429 if (!fs.exists(this.walArchiveDir)) { 430 if (!fs.mkdirs(this.walArchiveDir)) { 431 throw new IOException("Unable to mkdir " + this.walArchiveDir); 432 } 433 } 434 435 // If prefix is null||empty then just name it wal 436 this.walFilePrefix = 437 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); 438 // we only correctly differentiate suffices when numeric ones start with '.' 439 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { 440 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER 441 + "' but instead was '" + suffix + "'"); 442 } 443 // Now that it exists, set the storage policy for the entire directory of wal files related to 444 // this FSHLog instance 445 String storagePolicy = 446 conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); 447 CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); 448 this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); 449 this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); 450 451 this.ourFiles = new PathFilter() { 452 @Override 453 public boolean accept(final Path fileName) { 454 // The path should start with dir/<prefix> and end with our suffix 455 final String fileNameString = fileName.toString(); 456 if (!fileNameString.startsWith(prefixPathStr)) { 457 return false; 458 } 459 if (walFileSuffix.isEmpty()) { 460 // in the case of the null suffix, we need to ensure the filename ends with a timestamp. 461 return org.apache.commons.lang3.StringUtils 462 .isNumeric(fileNameString.substring(prefixPathStr.length())); 463 } else if (!fileNameString.endsWith(walFileSuffix)) { 464 return false; 465 } 466 return true; 467 } 468 }; 469 470 if (failIfWALExists) { 471 final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); 472 if (null != walFiles && 0 != walFiles.length) { 473 throw new IOException("Target WAL already exists within directory " + walDir); 474 } 475 } 476 477 // Register listeners. TODO: Should this exist anymore? We have CPs? 478 if (listeners != null) { 479 for (WALActionsListener i : listeners) { 480 registerWALActionsListener(i); 481 } 482 } 483 this.coprocessorHost = new WALCoprocessorHost(this, conf); 484 485 // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block 486 // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost 487 // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of 488 // the block size but experience from the field has it that this was not enough time for the 489 // roll to happen before end-of-block. So the new accounting makes WALs of about the same 490 // size as those made in hbase-1 (to prevent surprise), we now have default block size as 491 // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally 492 // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. 493 this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); 494 float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f); 495 this.logrollsize = (long) (this.blocksize * multiplier); 496 this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); 497 498 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" 499 + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" 500 + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir 501 + ", maxLogs=" + this.maxLogs); 502 this.slowSyncNs = TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, 503 conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS))); 504 this.rollOnSyncNs = TimeUnit.MILLISECONDS 505 .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS)); 506 this.slowSyncRollThreshold = 507 conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD); 508 this.slowSyncCheckInterval = 509 conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS); 510 this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, 511 conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS))); 512 this.syncFutureCache = new SyncFutureCache(conf); 513 this.implClassName = getClass().getSimpleName(); 514 this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); 515 archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); 516 this.walShutdownTimeout = 517 conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS); 518 } 519 520 /** 521 * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. 522 */ 523 public void init() throws IOException { 524 rollWriter(); 525 } 526 527 @Override 528 public void registerWALActionsListener(WALActionsListener listener) { 529 this.listeners.add(listener); 530 } 531 532 @Override 533 public boolean unregisterWALActionsListener(WALActionsListener listener) { 534 return this.listeners.remove(listener); 535 } 536 537 @Override 538 public WALCoprocessorHost getCoprocessorHost() { 539 return coprocessorHost; 540 } 541 542 @Override 543 public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { 544 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); 545 } 546 547 @Override 548 public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) { 549 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); 550 } 551 552 @Override 553 public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 554 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); 555 } 556 557 @Override 558 public void abortCacheFlush(byte[] encodedRegionName) { 559 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); 560 } 561 562 @Override 563 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { 564 // Used by tests. Deprecated as too subtle for general usage. 565 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); 566 } 567 568 @Override 569 public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { 570 // This method is used by tests and for figuring if we should flush or not because our 571 // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use 572 // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId 573 // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the 574 // currently flushing sequence ids, and if anything found there, it is returning these. This is 575 // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if 576 // we crash during the flush. For figuring what to flush, we might get requeued if our sequence 577 // id is old even though we are currently flushing. This may mean we do too much flushing. 578 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); 579 } 580 581 @Override 582 public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException { 583 return rollWriter(false); 584 } 585 586 @Override 587 public final void sync() throws IOException { 588 sync(useHsync); 589 } 590 591 @Override 592 public final void sync(long txid) throws IOException { 593 sync(txid, useHsync); 594 } 595 596 @Override 597 public final void sync(boolean forceSync) throws IOException { 598 TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync")); 599 } 600 601 @Override 602 public final void sync(long txid, boolean forceSync) throws IOException { 603 TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync")); 604 } 605 606 protected abstract void doSync(boolean forceSync) throws IOException; 607 608 protected abstract void doSync(long txid, boolean forceSync) throws IOException; 609 610 /** 611 * This is a convenience method that computes a new filename with a given file-number. 612 * @param filenum to use 613 */ 614 protected Path computeFilename(final long filenum) { 615 if (filenum < 0) { 616 throw new RuntimeException("WAL file number can't be < 0"); 617 } 618 String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; 619 return new Path(walDir, child); 620 } 621 622 /** 623 * This is a convenience method that computes a new filename with a given using the current WAL 624 * file-number 625 */ 626 public Path getCurrentFileName() { 627 return computeFilename(this.filenum.get()); 628 } 629 630 /** 631 * retrieve the next path to use for writing. Increments the internal filenum. 632 */ 633 private Path getNewPath() throws IOException { 634 this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime())); 635 Path newPath = getCurrentFileName(); 636 return newPath; 637 } 638 639 public Path getOldPath() { 640 long currentFilenum = this.filenum.get(); 641 Path oldPath = null; 642 if (currentFilenum > 0) { 643 // ComputeFilename will take care of meta wal filename 644 oldPath = computeFilename(currentFilenum); 645 } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? 646 return oldPath; 647 } 648 649 /** 650 * Tell listeners about pre log roll. 651 */ 652 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) 653 throws IOException { 654 coprocessorHost.preWALRoll(oldPath, newPath); 655 656 if (!this.listeners.isEmpty()) { 657 for (WALActionsListener i : this.listeners) { 658 i.preLogRoll(oldPath, newPath); 659 } 660 } 661 } 662 663 /** 664 * Tell listeners about post log roll. 665 */ 666 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) 667 throws IOException { 668 if (!this.listeners.isEmpty()) { 669 for (WALActionsListener i : this.listeners) { 670 i.postLogRoll(oldPath, newPath); 671 } 672 } 673 674 coprocessorHost.postWALRoll(oldPath, newPath); 675 } 676 677 // public only until class moves to o.a.h.h.wal 678 /** Returns the number of rolled log files */ 679 public int getNumRolledLogFiles() { 680 return walFile2Props.size(); 681 } 682 683 // public only until class moves to o.a.h.h.wal 684 /** Returns the number of log files in use */ 685 public int getNumLogFiles() { 686 // +1 for current use log 687 return getNumRolledLogFiles() + 1; 688 } 689 690 /** 691 * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the 692 * first (oldest) WAL, and return those regions which should be flushed so that it can be 693 * let-go/'archived'. 694 * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file 695 */ 696 Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException { 697 Map<byte[], List<byte[]>> regions = null; 698 int logCount = getNumRolledLogFiles(); 699 if (logCount > this.maxLogs && logCount > 0) { 700 Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry(); 701 regions = 702 this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId); 703 } 704 if (regions != null) { 705 List<String> listForPrint = new ArrayList<>(); 706 for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) { 707 StringBuilder families = new StringBuilder(); 708 for (int i = 0; i < r.getValue().size(); i++) { 709 if (i > 0) { 710 families.append(","); 711 } 712 families.append(Bytes.toString(r.getValue().get(i))); 713 } 714 listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]"); 715 } 716 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs 717 + "; forcing (partial) flush of " + regions.size() + " region(s): " 718 + StringUtils.join(",", listForPrint)); 719 } 720 return regions; 721 } 722 723 /** 724 * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file. 725 */ 726 protected final void markClosedAndClean(Path path) { 727 WALProps props = walFile2Props.get(path); 728 // typically this should not be null, but if there is no big issue if it is already null, so 729 // let's make the code more robust 730 if (props != null) { 731 props.closed = true; 732 cleanOldLogs(); 733 } 734 } 735 736 /** 737 * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. 738 * <p/> 739 * Use synchronized because we may call this method in different threads, normally when replacing 740 * writer, and since now close writer may be asynchronous, we will also call this method in the 741 * closeExecutor, right after we actually close a WAL writer. 742 */ 743 private synchronized void cleanOldLogs() { 744 List<Pair<Path, Long>> logsToArchive = null; 745 // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids 746 // are older than what is currently in memory, the WAL can be GC'd. 747 for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) { 748 if (!e.getValue().closed) { 749 LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey()); 750 continue; 751 } 752 Path log = e.getKey(); 753 Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId; 754 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { 755 if (logsToArchive == null) { 756 logsToArchive = new ArrayList<>(); 757 } 758 logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); 759 if (LOG.isTraceEnabled()) { 760 LOG.trace("WAL file ready for archiving " + log); 761 } 762 } 763 } 764 765 if (logsToArchive != null) { 766 final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; 767 // make it async 768 for (Pair<Path, Long> log : localLogsToArchive) { 769 logArchiveExecutor.execute(() -> { 770 archive(log); 771 }); 772 this.walFile2Props.remove(log.getFirst()); 773 } 774 } 775 } 776 777 protected void archive(final Pair<Path, Long> log) { 778 totalLogSize.addAndGet(-log.getSecond()); 779 int retry = 1; 780 while (true) { 781 try { 782 archiveLogFile(log.getFirst()); 783 // successful 784 break; 785 } catch (Throwable e) { 786 if (retry > archiveRetries) { 787 LOG.error("Failed log archiving for the log {},", log.getFirst(), e); 788 if (this.abortable != null) { 789 this.abortable.abort("Failed log archiving", e); 790 break; 791 } 792 } else { 793 LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e); 794 } 795 retry++; 796 } 797 } 798 } 799 800 /* 801 * only public so WALSplitter can use. 802 * @return archived location of a WAL file with the given path p 803 */ 804 public static Path getWALArchivePath(Path archiveDir, Path p) { 805 return new Path(archiveDir, p.getName()); 806 } 807 808 protected void archiveLogFile(final Path p) throws IOException { 809 Path newPath = getWALArchivePath(this.walArchiveDir, p); 810 // Tell our listeners that a log is going to be archived. 811 if (!this.listeners.isEmpty()) { 812 for (WALActionsListener i : this.listeners) { 813 i.preLogArchive(p, newPath); 814 } 815 } 816 LOG.info("Archiving " + p + " to " + newPath); 817 if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { 818 throw new IOException("Unable to rename " + p + " to " + newPath); 819 } 820 // Tell our listeners that a log has been archived. 821 if (!this.listeners.isEmpty()) { 822 for (WALActionsListener i : this.listeners) { 823 i.postLogArchive(p, newPath); 824 } 825 } 826 } 827 828 protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { 829 int oldNumEntries = this.numEntries.getAndSet(0); 830 String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; 831 if (oldPath != null) { 832 this.walFile2Props.put(oldPath, 833 new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); 834 this.totalLogSize.addAndGet(oldFileLen); 835 LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", 836 CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), 837 newPathString); 838 } else { 839 LOG.info("New WAL {}", newPathString); 840 } 841 } 842 843 private Span createSpan(String name) { 844 return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName); 845 } 846 847 /** 848 * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}. 849 * <p/> 850 * <ul> 851 * <li>In the case of creating a new WAL, oldPath will be null.</li> 852 * <li>In the case of rolling over from one file to the next, none of the parameters will be null. 853 * </li> 854 * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be 855 * null.</li> 856 * </ul> 857 * @param oldPath may be null 858 * @param newPath may be null 859 * @param nextWriter may be null 860 * @return the passed in <code>newPath</code> 861 * @throws IOException if there is a problem flushing or closing the underlying FS 862 */ 863 Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { 864 return TraceUtil.trace(() -> { 865 doReplaceWriter(oldPath, newPath, nextWriter); 866 return newPath; 867 }, () -> createSpan("WAL.replaceWriter")); 868 } 869 870 protected final void blockOnSync(SyncFuture syncFuture) throws IOException { 871 // Now we have published the ringbuffer, halt the current thread until we get an answer back. 872 try { 873 if (syncFuture != null) { 874 if (closed) { 875 throw new IOException("WAL has been closed"); 876 } else { 877 syncFuture.get(walSyncTimeoutNs); 878 } 879 } 880 } catch (TimeoutIOException tioe) { 881 throw new WALSyncTimeoutIOException(tioe); 882 } catch (InterruptedException ie) { 883 LOG.warn("Interrupted", ie); 884 throw convertInterruptedExceptionToIOException(ie); 885 } catch (ExecutionException e) { 886 throw ensureIOException(e.getCause()); 887 } 888 } 889 890 private static IOException ensureIOException(final Throwable t) { 891 return (t instanceof IOException) ? (IOException) t : new IOException(t); 892 } 893 894 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { 895 Thread.currentThread().interrupt(); 896 IOException ioe = new InterruptedIOException(); 897 ioe.initCause(ie); 898 return ioe; 899 } 900 901 private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException { 902 rollWriterLock.lock(); 903 try { 904 // Return if nothing to flush. 905 if (!force && this.writer != null && this.numEntries.get() <= 0) { 906 return null; 907 } 908 Map<byte[], List<byte[]>> regionsToFlush = null; 909 if (this.closed) { 910 LOG.debug("WAL closed. Skipping rolling of writer"); 911 return regionsToFlush; 912 } 913 try { 914 Path oldPath = getOldPath(); 915 Path newPath = getNewPath(); 916 // Any exception from here on is catastrophic, non-recoverable, so we currently abort. 917 W nextWriter = this.createWriterInstance(newPath); 918 tellListenersAboutPreLogRoll(oldPath, newPath); 919 // NewPath could be equal to oldPath if replaceWriter fails. 920 newPath = replaceWriter(oldPath, newPath, nextWriter); 921 tellListenersAboutPostLogRoll(oldPath, newPath); 922 if (LOG.isDebugEnabled()) { 923 LOG.debug("Create new " + implClassName + " writer with pipeline: " 924 + Arrays.toString(getPipeline())); 925 } 926 // We got a new writer, so reset the slow sync count 927 lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime(); 928 slowSyncCount.set(0); 929 // Can we delete any of the old log files? 930 if (getNumRolledLogFiles() > 0) { 931 cleanOldLogs(); 932 regionsToFlush = findRegionsToForceFlush(); 933 } 934 } catch (CommonFSUtils.StreamLacksCapabilityException exception) { 935 // If the underlying FileSystem can't do what we ask, treat as IO failure, so 936 // we'll abort. 937 throw new IOException( 938 "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.", 939 exception); 940 } 941 return regionsToFlush; 942 } finally { 943 rollWriterLock.unlock(); 944 } 945 } 946 947 @Override 948 public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException { 949 return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter")); 950 } 951 952 // public only until class moves to o.a.h.h.wal 953 /** Returns the size of log files in use */ 954 public long getLogFileSize() { 955 return this.totalLogSize.get(); 956 } 957 958 // public only until class moves to o.a.h.h.wal 959 public void requestLogRoll() { 960 requestLogRoll(ERROR); 961 } 962 963 /** 964 * Get the backing files associated with this WAL. 965 * @return may be null if there are no files. 966 */ 967 FileStatus[] getFiles() throws IOException { 968 return CommonFSUtils.listStatus(fs, walDir, ourFiles); 969 } 970 971 @Override 972 public void shutdown() throws IOException { 973 if (!shutdown.compareAndSet(false, true)) { 974 return; 975 } 976 closed = true; 977 // Tell our listeners that the log is closing 978 if (!this.listeners.isEmpty()) { 979 for (WALActionsListener i : this.listeners) { 980 i.logCloseRequested(); 981 } 982 } 983 984 ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor( 985 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build()); 986 987 Future<Void> future = shutdownExecutor.submit(new Callable<Void>() { 988 @Override 989 public Void call() throws Exception { 990 if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) { 991 try { 992 doShutdown(); 993 if (syncFutureCache != null) { 994 syncFutureCache.clear(); 995 } 996 } finally { 997 rollWriterLock.unlock(); 998 } 999 } else { 1000 throw new IOException("Waiting for rollWriterLock timeout"); 1001 } 1002 return null; 1003 } 1004 }); 1005 shutdownExecutor.shutdown(); 1006 1007 try { 1008 future.get(walShutdownTimeout, TimeUnit.MILLISECONDS); 1009 } catch (InterruptedException e) { 1010 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1011 } catch (TimeoutException e) { 1012 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1013 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1014 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1015 + "\"", e); 1016 } catch (ExecutionException e) { 1017 if (e.getCause() instanceof IOException) { 1018 throw (IOException) e.getCause(); 1019 } else { 1020 throw new IOException(e.getCause()); 1021 } 1022 } finally { 1023 // in shutdown, we may call cleanOldLogs so shutdown this executor in the end. 1024 // In sync replication implementation, we may shut down a WAL without shutting down the whole 1025 // region server, if we shut down this executor earlier we may get reject execution exception 1026 logArchiveExecutor.shutdown(); 1027 } 1028 // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still 1029 // have some pending archiving tasks not finished yet, and in close we may archive all the 1030 // remaining WAL files, there could be race if we do not wait for the background archive task 1031 // finish 1032 try { 1033 if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) { 1034 throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but" 1035 + " the shutdown of WAL doesn't complete! Please check the status of underlying " 1036 + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS 1037 + "\""); 1038 } 1039 } catch (InterruptedException e) { 1040 throw new InterruptedIOException("Interrupted when waiting for shutdown WAL"); 1041 } 1042 } 1043 1044 @Override 1045 public void close() throws IOException { 1046 shutdown(); 1047 final FileStatus[] files = getFiles(); 1048 if (null != files && 0 != files.length) { 1049 for (FileStatus file : files) { 1050 Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); 1051 // Tell our listeners that a log is going to be archived. 1052 if (!this.listeners.isEmpty()) { 1053 for (WALActionsListener i : this.listeners) { 1054 i.preLogArchive(file.getPath(), p); 1055 } 1056 } 1057 1058 if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { 1059 throw new IOException("Unable to rename " + file.getPath() + " to " + p); 1060 } 1061 // Tell our listeners that a log was archived. 1062 if (!this.listeners.isEmpty()) { 1063 for (WALActionsListener i : this.listeners) { 1064 i.postLogArchive(file.getPath(), p); 1065 } 1066 } 1067 } 1068 LOG.debug( 1069 "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir)); 1070 } 1071 LOG.info("Closed WAL: " + toString()); 1072 } 1073 1074 /** Returns number of WALs currently in the process of closing. */ 1075 public int getInflightWALCloseCount() { 1076 return inflightWALClosures.size(); 1077 } 1078 1079 /** 1080 * updates the sequence number of a specific store. depending on the flag: replaces current seq 1081 * number if the given seq id is bigger, or even if it is lower than existing one 1082 */ 1083 @Override 1084 public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, 1085 boolean onlyIfGreater) { 1086 sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); 1087 } 1088 1089 protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) { 1090 return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync); 1091 } 1092 1093 protected boolean isLogRollRequested() { 1094 return rollRequested.get(); 1095 } 1096 1097 protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) { 1098 // If we have already requested a roll, don't do it again 1099 // And only set rollRequested to true when there is a registered listener 1100 if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) { 1101 for (WALActionsListener i : this.listeners) { 1102 i.logRollRequested(reason); 1103 } 1104 } 1105 } 1106 1107 long getUnflushedEntriesCount() { 1108 long highestSynced = this.highestSyncedTxid.get(); 1109 long highestUnsynced = this.highestUnsyncedTxid; 1110 return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; 1111 } 1112 1113 boolean isUnflushedEntries() { 1114 return getUnflushedEntriesCount() > 0; 1115 } 1116 1117 /** 1118 * Exposed for testing only. Use to tricks like halt the ring buffer appending. 1119 */ 1120 protected void atHeadOfRingBufferEventHandlerAppend() { 1121 // Noop 1122 } 1123 1124 protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException { 1125 // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. 1126 atHeadOfRingBufferEventHandlerAppend(); 1127 long start = EnvironmentEdgeManager.currentTime(); 1128 byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); 1129 long regionSequenceId = entry.getKey().getSequenceId(); 1130 1131 // Edits are empty, there is nothing to append. Maybe empty when we are looking for a 1132 // region sequence id only, a region edit/sequence id that is not associated with an actual 1133 // edit. It has to go through all the rigmarole to be sure we have the right ordering. 1134 if (entry.getEdit().isEmpty()) { 1135 return false; 1136 } 1137 1138 // Coprocessor hook. 1139 coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1140 if (!listeners.isEmpty()) { 1141 for (WALActionsListener i : listeners) { 1142 i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1143 } 1144 } 1145 doAppend(writer, entry); 1146 assert highestUnsyncedTxid < entry.getTxid(); 1147 highestUnsyncedTxid = entry.getTxid(); 1148 if (entry.isCloseRegion()) { 1149 // let's clean all the records of this region 1150 sequenceIdAccounting.onRegionClose(encodedRegionName); 1151 } else { 1152 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, 1153 entry.isInMemStore()); 1154 } 1155 coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); 1156 // Update metrics. 1157 postAppend(entry, EnvironmentEdgeManager.currentTime() - start); 1158 numEntries.incrementAndGet(); 1159 return true; 1160 } 1161 1162 private long postAppend(final Entry e, final long elapsedTime) throws IOException { 1163 long len = 0; 1164 if (!listeners.isEmpty()) { 1165 for (Cell cell : e.getEdit().getCells()) { 1166 len += PrivateCellUtil.estimatedSerializedSizeOf(cell); 1167 } 1168 for (WALActionsListener listener : listeners) { 1169 listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); 1170 } 1171 } 1172 return len; 1173 } 1174 1175 protected final void postSync(final long timeInNanos, final int handlerSyncs) { 1176 if (timeInNanos > this.slowSyncNs) { 1177 String msg = new StringBuilder().append("Slow sync cost: ") 1178 .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ") 1179 .append(Arrays.toString(getPipeline())).toString(); 1180 LOG.info(msg); 1181 // A single sync took too long. 1182 // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative 1183 // effects. Here we have a single data point that indicates we should take immediate 1184 // action, so do so. 1185 if (timeInNanos > this.rollOnSyncNs) { 1186 LOG.warn("Requesting log roll because we exceeded slow sync threshold; time=" 1187 + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold=" 1188 + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: " 1189 + Arrays.toString(getPipeline())); 1190 requestLogRoll(SLOW_SYNC); 1191 } 1192 slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this 1193 } 1194 if (!listeners.isEmpty()) { 1195 for (WALActionsListener listener : listeners) { 1196 listener.postSync(timeInNanos, handlerSyncs); 1197 } 1198 } 1199 } 1200 1201 protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, 1202 WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { 1203 if (this.closed) { 1204 throw new IOException( 1205 "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); 1206 } 1207 MutableLong txidHolder = new MutableLong(); 1208 MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { 1209 txidHolder.setValue(ringBuffer.next()); 1210 }); 1211 long txid = txidHolder.longValue(); 1212 ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) 1213 .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); 1214 try { 1215 FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); 1216 entry.stampRegionSequenceId(we); 1217 ringBuffer.get(txid).load(entry); 1218 } finally { 1219 ringBuffer.publish(txid); 1220 } 1221 return txid; 1222 } 1223 1224 @Override 1225 public String toString() { 1226 return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; 1227 } 1228 1229 /** 1230 * if the given {@code path} is being written currently, then return its length. 1231 * <p> 1232 * This is used by replication to prevent replicating unacked log entries. See 1233 * https://issues.apache.org/jira/browse/HBASE-14004 for more details. 1234 */ 1235 @Override 1236 public OptionalLong getLogFileSizeIfBeingWritten(Path path) { 1237 rollWriterLock.lock(); 1238 try { 1239 Path currentPath = getOldPath(); 1240 if (path.equals(currentPath)) { 1241 // Currently active path. 1242 W writer = this.writer; 1243 return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty(); 1244 } else { 1245 W temp = inflightWALClosures.get(path.getName()); 1246 if (temp != null) { 1247 // In the process of being closed, trailer bytes may or may not be flushed. 1248 // Ensuring that we read all the bytes in a file is critical for correctness of tailing 1249 // use cases like replication, see HBASE-25924/HBASE-25932. 1250 return OptionalLong.of(temp.getSyncedLength()); 1251 } 1252 // Log rolled successfully. 1253 return OptionalLong.empty(); 1254 } 1255 } finally { 1256 rollWriterLock.unlock(); 1257 } 1258 } 1259 1260 @Override 1261 public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1262 return TraceUtil.trace(() -> append(info, key, edits, true), 1263 () -> createSpan("WAL.appendData")); 1264 } 1265 1266 @Override 1267 public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException { 1268 return TraceUtil.trace(() -> append(info, key, edits, false), 1269 () -> createSpan("WAL.appendMarker")); 1270 } 1271 1272 /** 1273 * Append a set of edits to the WAL. 1274 * <p/> 1275 * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must 1276 * have its region edit/sequence id assigned else it messes up our unification of mvcc and 1277 * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in. 1278 * <p/> 1279 * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc 1280 * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment 1281 * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must 1282 * 'complete' the transaction this mvcc transaction by calling 1283 * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it 1284 * in the finally of a try/finally block within which this appends lives and any subsequent 1285 * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the 1286 * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not 1287 * immediately available on return from this method. It WILL be available subsequent to a sync of 1288 * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. 1289 * @param info the regioninfo associated with append 1290 * @param key Modified by this call; we add to it this edits region edit/sequence id. 1291 * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit 1292 * sequence id that is after all currently appended edits. 1293 * @param inMemstore Always true except for case where we are writing a region event meta marker 1294 * edit, for example, a compaction completion record into the WAL or noting a 1295 * Region Open event. In these cases the entry is just so we can finish an 1296 * unfinished compaction after a crash when the new Server reads the WAL on 1297 * recovery, etc. These transition event 'Markers' do not go via the memstore. 1298 * When memstore is false, we presume a Marker event edit. 1299 * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id 1300 * in it. 1301 */ 1302 protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) 1303 throws IOException; 1304 1305 protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; 1306 1307 protected abstract W createWriterInstance(Path path) 1308 throws IOException, CommonFSUtils.StreamLacksCapabilityException; 1309 1310 /** 1311 * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer 1312 * will begin to work before returning from this method. If we clear the flag after returning from 1313 * this call, we may miss a roll request. The implementation class should choose a proper place to 1314 * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you 1315 * start writing to the new writer. 1316 */ 1317 protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) 1318 throws IOException; 1319 1320 protected abstract void doShutdown() throws IOException; 1321 1322 protected abstract boolean doCheckLogLowReplication(); 1323 1324 /** Returns true if we exceeded the slow sync roll threshold over the last check interval */ 1325 protected boolean doCheckSlowSync() { 1326 boolean result = false; 1327 long now = EnvironmentEdgeManager.currentTime(); 1328 long elapsedTime = now - lastTimeCheckSlowSync; 1329 if (elapsedTime >= slowSyncCheckInterval) { 1330 if (slowSyncCount.get() >= slowSyncRollThreshold) { 1331 if (elapsedTime >= (2 * slowSyncCheckInterval)) { 1332 // If two or more slowSyncCheckInterval have elapsed this is a corner case 1333 // where a train of slow syncs almost triggered us but then there was a long 1334 // interval from then until the one more that pushed us over. If so, we 1335 // should do nothing and let the count reset. 1336 if (LOG.isDebugEnabled()) { 1337 LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count=" 1338 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime=" 1339 + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms"); 1340 } 1341 // Fall through to count reset below 1342 } else { 1343 LOG.warn("Requesting log roll because we exceeded slow sync threshold; count=" 1344 + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: " 1345 + Arrays.toString(getPipeline())); 1346 result = true; 1347 } 1348 } 1349 lastTimeCheckSlowSync = now; 1350 slowSyncCount.set(0); 1351 } 1352 return result; 1353 } 1354 1355 public void checkLogLowReplication(long checkInterval) { 1356 long now = EnvironmentEdgeManager.currentTime(); 1357 if (now - lastTimeCheckLowReplication < checkInterval) { 1358 return; 1359 } 1360 // Will return immediately if we are in the middle of a WAL log roll currently. 1361 if (!rollWriterLock.tryLock()) { 1362 return; 1363 } 1364 try { 1365 lastTimeCheckLowReplication = now; 1366 if (doCheckLogLowReplication()) { 1367 requestLogRoll(LOW_REPLICATION); 1368 } 1369 } finally { 1370 rollWriterLock.unlock(); 1371 } 1372 } 1373 1374 /** 1375 * This method gets the pipeline for the current WAL. 1376 */ 1377 abstract DatanodeInfo[] getPipeline(); 1378 1379 /** 1380 * This method gets the datanode replication count for the current WAL. 1381 */ 1382 abstract int getLogReplication(); 1383 1384 private static void split(final Configuration conf, final Path p) throws IOException { 1385 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 1386 if (!fs.exists(p)) { 1387 throw new FileNotFoundException(p.toString()); 1388 } 1389 if (!fs.getFileStatus(p).isDirectory()) { 1390 throw new IOException(p + " is not a directory"); 1391 } 1392 1393 final Path baseDir = CommonFSUtils.getWALRootDir(conf); 1394 Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); 1395 if ( 1396 conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, 1397 AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR) 1398 ) { 1399 archiveDir = new Path(archiveDir, p.getName()); 1400 } 1401 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); 1402 } 1403 1404 W getWriter() { 1405 return this.writer; 1406 } 1407 1408 private static void usage() { 1409 System.err.println("Usage: AbstractFSWAL <ARGS>"); 1410 System.err.println("Arguments:"); 1411 System.err.println(" --dump Dump textual representation of passed one or more files"); 1412 System.err.println(" For example: " 1413 + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); 1414 System.err.println(" --split Split the passed directory of WAL logs"); 1415 System.err.println( 1416 " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR"); 1417 } 1418 1419 /** 1420 * Pass one or more log file names, and it will either dump out a text version on 1421 * <code>stdout</code> or split the specified log files. 1422 */ 1423 public static void main(String[] args) throws IOException { 1424 if (args.length < 2) { 1425 usage(); 1426 System.exit(-1); 1427 } 1428 // either dump using the WALPrettyPrinter or split, depending on args 1429 if (args[0].compareTo("--dump") == 0) { 1430 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length)); 1431 } else if (args[0].compareTo("--perf") == 0) { 1432 LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:"); 1433 LOG.error(HBaseMarkers.FATAL, 1434 "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]); 1435 System.exit(-1); 1436 } else if (args[0].compareTo("--split") == 0) { 1437 Configuration conf = HBaseConfiguration.create(); 1438 for (int i = 1; i < args.length; i++) { 1439 try { 1440 Path logPath = new Path(args[i]); 1441 CommonFSUtils.setFsDefault(conf, logPath); 1442 split(conf, logPath); 1443 } catch (IOException t) { 1444 t.printStackTrace(System.err); 1445 System.exit(-1); 1446 } 1447 } 1448 } else { 1449 usage(); 1450 System.exit(-1); 1451 } 1452 } 1453}