001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR; 021import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.lmax.disruptor.RingBuffer; 025import com.lmax.disruptor.Sequence; 026import com.lmax.disruptor.Sequencer; 027import java.io.IOException; 028import java.lang.reflect.Field; 029import java.util.ArrayDeque; 030import java.util.Comparator; 031import java.util.Deque; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Queue; 035import java.util.SortedSet; 036import java.util.TreeSet; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.locks.Condition; 043import java.util.concurrent.locks.Lock; 044import java.util.concurrent.locks.ReentrantLock; 045import java.util.function.Supplier; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.FileSystem; 048import org.apache.hadoop.fs.Path; 049import org.apache.hadoop.hbase.Abortable; 050import org.apache.hadoop.hbase.HBaseInterfaceAudience; 051import org.apache.hadoop.hbase.client.RegionInfo; 052import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; 053import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 054import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 055import org.apache.hadoop.hbase.wal.WALEdit; 056import org.apache.hadoop.hbase.wal.WALKeyImpl; 057import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; 058import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 059import org.apache.yetus.audience.InterfaceAudience; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 065import org.apache.hbase.thirdparty.io.netty.channel.Channel; 066import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 067import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 068import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; 069 070/** 071 * An asynchronous implementation of FSWAL. 072 * <p> 073 * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. 074 * <p> 075 * For append, we process it as follow: 076 * <ol> 077 * <li>In the caller thread(typically, in the rpc handler thread): 078 * <ol> 079 * <li>Insert the entry into 'waitingConsumePayloads'. Use ringbuffer sequence as txid.</li> 080 * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. 081 * </li> 082 * </ol> 083 * </li> 084 * <li>In the consumer task(executed in a single threaded thread pool) 085 * <ol> 086 * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into 087 * {@link #toWriteAppends}</li> 088 * <li>Poll the entry from {@link #toWriteAppends}, append it to the AsyncWriter, and insert it into 089 * {@link #unackedAppends}</li> 090 * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call 091 * sync on the AsyncWriter.</li> 092 * <li>In the callback methods: 093 * <ul> 094 * <li>If succeeded, poll the entry from {@link #unackedAppends} and drop it.</li> 095 * <li>If failed, add all the entries in {@link #unackedAppends} back to {@link #toWriteAppends} and 096 * wait for writing them again.</li> 097 * </ul> 098 * </li> 099 * </ol> 100 * </li> 101 * </ol> 102 * For sync, the processing stages are almost same. And different from FSHLog, we will open a new 103 * writer and rewrite unacked entries to the new writer and sync again if we hit a sync error. 104 * <p> 105 * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with 106 * FSHLog.<br> 107 * For a normal roll request(for example, we have reached the log roll size): 108 * <ol> 109 * <li>In the log roller thread, we will set {@link #waitingRoll} to true and 110 * {@link #readyForRolling} to false, and then wait on {@link #readyForRolling}(see 111 * {@link #waitForSafePoint()}).</li> 112 * <li>In the consumer thread, we will stop polling entries from {@link #waitingConsumePayloads} if 113 * {@link #waitingRoll} is true, and also stop writing the entries in {@link #toWriteAppends} out. 114 * </li> 115 * <li>If there are unflush data in the writer, sync them.</li> 116 * <li>When all out-going sync request is finished, i.e, the {@link #unackedAppends} is empty, 117 * signal the {@link #readyForRollingCond}.</li> 118 * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., 119 * we reach a safe point. So it is safe to replace old writer with new writer now.</li> 120 * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> 121 * <li>Schedule the consumer task.</li> 122 * <li>Schedule a background task to close the old writer.</li> 123 * </ol> 124 * For a broken writer roll request, the only difference is that we can bypass the wait for safe 125 * point stage. 126 */ 127@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 128public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { 129 130 private static final Logger LOG = LoggerFactory.getLogger(AsyncFSWAL.class); 131 132 private static final Comparator<SyncFuture> SEQ_COMPARATOR = 133 Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode); 134 135 public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; 136 public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; 137 138 public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = 139 "hbase.wal.async.use-shared-event-loop"; 140 public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false; 141 142 public static final String ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 143 "hbase.wal.async.wait.on.shutdown.seconds"; 144 public static final int DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS = 5; 145 146 private final EventLoopGroup eventLoopGroup; 147 148 private final ExecutorService consumeExecutor; 149 150 private final Class<? extends Channel> channelClass; 151 152 private final Lock consumeLock = new ReentrantLock(); 153 154 private final Runnable consumer = this::consume; 155 156 // check if there is already a consumer task in the event loop's task queue 157 private final Supplier<Boolean> hasConsumerTask; 158 159 private static final int MAX_EPOCH = 0x3FFFFFFF; 160 // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old 161 // writer to be closed. 162 // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter 163 // is needed. 164 // all other bits are the epoch number of the current writer, this is used to detect whether the 165 // writer is still the one when you issue the sync. 166 // notice that, modification to this field is only allowed under the protection of consumeLock. 167 private volatile int epochAndState; 168 169 private boolean readyForRolling; 170 171 private final Condition readyForRollingCond = consumeLock.newCondition(); 172 173 private final RingBuffer<RingBufferTruck> waitingConsumePayloads; 174 175 private final Sequence waitingConsumePayloadsGatingSequence; 176 177 private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); 178 179 private final long batchSize; 180 181 private volatile AsyncFSOutput fsOut; 182 183 private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>(); 184 185 private final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>(); 186 187 private final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR); 188 189 // the highest txid of WAL entries being processed 190 private long highestProcessedAppendTxid; 191 192 // file length when we issue last sync request on the writer 193 private long fileLengthAtLastSync; 194 195 private long highestProcessedAppendTxidAtLastSync; 196 197 private final int waitOnShutdownInSeconds; 198 199 private final StreamSlowMonitor streamSlowMonitor; 200 201 public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, 202 Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, 203 String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) 204 throws FailedLogCloseException, IOException { 205 this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, 206 eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix")); 207 } 208 209 public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 210 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 211 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 212 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 213 throws FailedLogCloseException, IOException { 214 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 215 suffix); 216 this.eventLoopGroup = eventLoopGroup; 217 this.channelClass = channelClass; 218 this.streamSlowMonitor = monitor; 219 Supplier<Boolean> hasConsumerTask; 220 if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { 221 this.consumeExecutor = eventLoopGroup.next(); 222 if (consumeExecutor instanceof SingleThreadEventExecutor) { 223 try { 224 Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); 225 field.setAccessible(true); 226 Queue<?> queue = (Queue<?>) field.get(consumeExecutor); 227 hasConsumerTask = () -> queue.peek() == consumer; 228 } catch (Exception e) { 229 LOG.warn("Can not get task queue of " + consumeExecutor 230 + ", this is not necessary, just give up", e); 231 hasConsumerTask = () -> false; 232 } 233 } else { 234 hasConsumerTask = () -> false; 235 } 236 } else { 237 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 238 new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder() 239 .setNameFormat("AsyncFSWAL-%d-" + rootDir.toString()).setDaemon(true).build()); 240 hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; 241 this.consumeExecutor = threadPool; 242 } 243 244 this.hasConsumerTask = hasConsumerTask; 245 int preallocatedEventCount = 246 conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); 247 waitingConsumePayloads = 248 RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); 249 waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); 250 waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); 251 252 // inrease the ringbuffer sequence so our txid is start from 1 253 waitingConsumePayloads.publish(waitingConsumePayloads.next()); 254 waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor()); 255 256 batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); 257 waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS, 258 DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS); 259 } 260 261 /** 262 * Helper that marks the future as DONE and offers it back to the cache. 263 */ 264 private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { 265 future.done(txid, t); 266 syncFutureCache.offer(future); 267 } 268 269 private static boolean waitingRoll(int epochAndState) { 270 return (epochAndState & 1) != 0; 271 } 272 273 private static boolean writerBroken(int epochAndState) { 274 return ((epochAndState >>> 1) & 1) != 0; 275 } 276 277 private static int epoch(int epochAndState) { 278 return epochAndState >>> 2; 279 } 280 281 // return whether we have successfully set readyForRolling to true. 282 private boolean trySetReadyForRolling() { 283 // Check without holding lock first. Usually we will just return here. 284 // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe 285 // to check them outside the consumeLock. 286 if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { 287 return false; 288 } 289 consumeLock.lock(); 290 try { 291 // 1. a roll is requested 292 // 2. all out-going entries have been acked(we have confirmed above). 293 if (waitingRoll(epochAndState)) { 294 readyForRolling = true; 295 readyForRollingCond.signalAll(); 296 return true; 297 } else { 298 return false; 299 } 300 } finally { 301 consumeLock.unlock(); 302 } 303 } 304 305 private void syncFailed(long epochWhenSync, Throwable error) { 306 LOG.warn("sync failed", error); 307 boolean shouldRequestLogRoll = true; 308 consumeLock.lock(); 309 try { 310 int currentEpochAndState = epochAndState; 311 if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { 312 // this is not the previous writer which means we have already rolled the writer. 313 // or this is still the current writer, but we have already marked it as broken and request 314 // a roll. 315 return; 316 } 317 this.epochAndState = currentEpochAndState | 0b10; 318 if (waitingRoll(currentEpochAndState)) { 319 readyForRolling = true; 320 readyForRollingCond.signalAll(); 321 // this means we have already in the middle of a rollWriter so just tell the roller thread 322 // that you can continue without requesting an extra log roll. 323 shouldRequestLogRoll = false; 324 } 325 } finally { 326 consumeLock.unlock(); 327 } 328 for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) { 329 toWriteAppends.addFirst(iter.next()); 330 } 331 highestUnsyncedTxid = highestSyncedTxid.get(); 332 if (shouldRequestLogRoll) { 333 // request a roll. 334 requestLogRoll(ERROR); 335 } 336 } 337 338 private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processedTxid, 339 long startTimeNs) { 340 // Please see the last several comments on HBASE-22761, it is possible that we get a 341 // syncCompleted which acks a previous sync request after we received a syncFailed on the same 342 // writer. So here we will also check on the epoch and state, if the epoch has already been 343 // changed, i.e, we have already rolled the writer, or the writer is already broken, we should 344 // just skip here, to avoid mess up the state or accidentally release some WAL entries and 345 // cause data corruption. 346 // The syncCompleted call is on the critical write path, so we should try our best to make it 347 // fast. So here we do not hold consumeLock, for increasing performance. It is safe because 348 // there are only 3 possible situations: 349 // 1. For normal case, the only place where we change epochAndState is when rolling the writer. 350 // Before rolling actually happen, we will only change the state to waitingRoll which is another 351 // bit than writerBroken, and when we actually change the epoch, we can make sure that there is 352 // no outgoing sync request. So we will always pass the check here and there is no problem. 353 // 2. The writer is broken, but we have not called syncFailed yet. In this case, since 354 // syncFailed and syncCompleted are executed in the same thread, we will just face the same 355 // situation with #1. 356 // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are 357 // only 2 possible situations: 358 // a. we arrive before we actually roll the writer, then we will find out the writer is broken 359 // and give up. 360 // b. we arrive after we actually roll the writer, then we will find out the epoch is changed 361 // and give up. 362 // For both #a and #b, we do not need to hold the consumeLock as we will always update the 363 // epochAndState as a whole. 364 // So in general, for all the cases above, we do not need to hold the consumeLock. 365 int epochAndState = this.epochAndState; 366 if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) { 367 LOG.warn("Got a sync complete call after the writer is broken, skip"); 368 return; 369 } 370 highestSyncedTxid.set(processedTxid); 371 for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) { 372 FSWALEntry entry = iter.next(); 373 if (entry.getTxid() <= processedTxid) { 374 entry.release(); 375 iter.remove(); 376 } else { 377 break; 378 } 379 } 380 postSync(System.nanoTime() - startTimeNs, finishSync()); 381 if (trySetReadyForRolling()) { 382 // we have just finished a roll, then do not need to check for log rolling, the writer will be 383 // closed soon. 384 return; 385 } 386 // If we haven't already requested a roll, check if we have exceeded logrollsize 387 if (!isLogRollRequested() && writer.getLength() > logrollsize) { 388 if (LOG.isDebugEnabled()) { 389 LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength() 390 + ", logrollsize=" + logrollsize); 391 } 392 requestLogRoll(SIZE); 393 } 394 } 395 396 // find all the sync futures between these two txids to see if we need to issue a hsync, if no 397 // sync futures then just use the default one. 398 private boolean isHsync(long beginTxid, long endTxid) { 399 SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false), 400 new SyncFuture().reset(endTxid + 1, false)); 401 if (futures.isEmpty()) { 402 return useHsync; 403 } 404 for (SyncFuture future : futures) { 405 if (future.isForceSync()) { 406 return true; 407 } 408 } 409 return false; 410 } 411 412 private void sync(AsyncWriter writer) { 413 fileLengthAtLastSync = writer.getLength(); 414 long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; 415 boolean shouldUseHsync = 416 isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); 417 highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; 418 final long startTimeNs = System.nanoTime(); 419 final long epoch = (long) epochAndState >>> 2L; 420 addListener(writer.sync(shouldUseHsync), (result, error) -> { 421 if (error != null) { 422 syncFailed(epoch, error); 423 } else { 424 syncCompleted(epoch, writer, currentHighestProcessedAppendTxid, startTimeNs); 425 } 426 }, consumeExecutor); 427 } 428 429 private int finishSyncLowerThanTxid(long txid) { 430 int finished = 0; 431 for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) { 432 SyncFuture sync = iter.next(); 433 if (sync.getTxid() <= txid) { 434 markFutureDoneAndOffer(sync, txid, null); 435 iter.remove(); 436 finished++; 437 } else { 438 break; 439 } 440 } 441 return finished; 442 } 443 444 // try advancing the highestSyncedTxid as much as possible 445 private int finishSync() { 446 if (unackedAppends.isEmpty()) { 447 // All outstanding appends have been acked. 448 if (toWriteAppends.isEmpty()) { 449 // Also no appends that wait to be written out, then just finished all pending syncs. 450 long maxSyncTxid = highestSyncedTxid.get(); 451 for (SyncFuture sync : syncFutures) { 452 maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid()); 453 sync.done(maxSyncTxid, null); 454 } 455 highestSyncedTxid.set(maxSyncTxid); 456 int finished = syncFutures.size(); 457 syncFutures.clear(); 458 return finished; 459 } else { 460 // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so 461 // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between 462 // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished. 463 long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid(); 464 assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid; 465 long doneTxid = lowestUnprocessedAppendTxid - 1; 466 highestSyncedTxid.set(doneTxid); 467 return finishSyncLowerThanTxid(doneTxid); 468 } 469 } else { 470 // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the 471 // first unacked append minus 1. 472 long lowestUnackedAppendTxid = unackedAppends.peek().getTxid(); 473 long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get()); 474 highestSyncedTxid.set(doneTxid); 475 return finishSyncLowerThanTxid(doneTxid); 476 } 477 } 478 479 // confirm non-empty before calling 480 private static long getLastTxid(Deque<FSWALEntry> queue) { 481 return queue.peekLast().getTxid(); 482 } 483 484 private void appendAndSync() { 485 final AsyncWriter writer = this.writer; 486 // maybe a sync request is not queued when we issue a sync, so check here to see if we could 487 // finish some. 488 finishSync(); 489 long newHighestProcessedAppendTxid = -1L; 490 // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single 491 // threaded, this could save us some cycles 492 boolean addedToUnackedAppends = false; 493 for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { 494 FSWALEntry entry = iter.next(); 495 boolean appended; 496 try { 497 appended = appendEntry(writer, entry); 498 } catch (IOException e) { 499 throw new AssertionError("should not happen", e); 500 } 501 newHighestProcessedAppendTxid = entry.getTxid(); 502 iter.remove(); 503 if (appended) { 504 // This is possible, when we fail to sync, we will add the unackedAppends back to 505 // toWriteAppends, so here we may get an entry which is already in the unackedAppends. 506 if ( 507 addedToUnackedAppends || unackedAppends.isEmpty() 508 || getLastTxid(unackedAppends) < entry.getTxid() 509 ) { 510 unackedAppends.addLast(entry); 511 addedToUnackedAppends = true; 512 } 513 // See HBASE-25905, here we need to make sure that, we will always write all the entries in 514 // unackedAppends out. As the code in the consume method will assume that, the entries in 515 // unackedAppends have all been sent out so if there is roll request and unackedAppends is 516 // not empty, we could just return as later there will be a syncCompleted call to clear the 517 // unackedAppends, or a syncFailed to lead us to another state. 518 // There could be other ways to fix, such as changing the logic in the consume method, but 519 // it will break the assumption and then (may) lead to a big refactoring. So here let's use 520 // this way to fix first, can optimize later. 521 if ( 522 writer.getLength() - fileLengthAtLastSync >= batchSize 523 && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends)) 524 ) { 525 break; 526 } 527 } 528 } 529 // if we have a newer transaction id, update it. 530 // otherwise, use the previous transaction id. 531 if (newHighestProcessedAppendTxid > 0) { 532 highestProcessedAppendTxid = newHighestProcessedAppendTxid; 533 } else { 534 newHighestProcessedAppendTxid = highestProcessedAppendTxid; 535 } 536 537 if (writer.getLength() - fileLengthAtLastSync >= batchSize) { 538 // sync because buffer size limit. 539 sync(writer); 540 return; 541 } 542 if (writer.getLength() == fileLengthAtLastSync) { 543 // we haven't written anything out, just advance the highestSyncedSequence since we may only 544 // stamp some region sequence id. 545 if (unackedAppends.isEmpty()) { 546 highestSyncedTxid.set(highestProcessedAppendTxid); 547 finishSync(); 548 trySetReadyForRolling(); 549 } 550 return; 551 } 552 // reach here means that we have some unsynced data but haven't reached the batch size yet, 553 // but we will not issue a sync directly here even if there are sync requests because we may 554 // have some new data in the ringbuffer, so let's just return here and delay the decision of 555 // whether to issue a sync in the caller method. 556 } 557 558 private void consume() { 559 consumeLock.lock(); 560 try { 561 int currentEpochAndState = epochAndState; 562 if (writerBroken(currentEpochAndState)) { 563 return; 564 } 565 if (waitingRoll(currentEpochAndState)) { 566 if (writer.getLength() > fileLengthAtLastSync) { 567 // issue a sync 568 sync(writer); 569 } else { 570 if (unackedAppends.isEmpty()) { 571 readyForRolling = true; 572 readyForRollingCond.signalAll(); 573 } 574 } 575 return; 576 } 577 } finally { 578 consumeLock.unlock(); 579 } 580 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 581 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 582 <= cursorBound; nextCursor++) { 583 if (!waitingConsumePayloads.isPublished(nextCursor)) { 584 break; 585 } 586 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 587 switch (truck.type()) { 588 case APPEND: 589 toWriteAppends.addLast(truck.unloadAppend()); 590 break; 591 case SYNC: 592 syncFutures.add(truck.unloadSync()); 593 break; 594 default: 595 LOG.warn("RingBufferTruck with unexpected type: " + truck.type()); 596 break; 597 } 598 waitingConsumePayloadsGatingSequence.set(nextCursor); 599 } 600 appendAndSync(); 601 if (hasConsumerTask.get()) { 602 return; 603 } 604 if (toWriteAppends.isEmpty()) { 605 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 606 consumerScheduled.set(false); 607 // recheck here since in append and sync we do not hold the consumeLock. Thing may 608 // happen like 609 // 1. we check cursor, no new entry 610 // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and 611 // give up scheduling the consumer task. 612 // 3. we set consumerScheduled to false and also give up scheduling consumer task. 613 if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) { 614 // we will give up consuming so if there are some unsynced data we need to issue a sync. 615 if ( 616 writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty() 617 && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync 618 ) { 619 // no new data in the ringbuffer and we have at least one sync request 620 sync(writer); 621 } 622 return; 623 } else { 624 // maybe someone has grabbed this before us 625 if (!consumerScheduled.compareAndSet(false, true)) { 626 return; 627 } 628 } 629 } 630 } 631 // reschedule if we still have something to write. 632 consumeExecutor.execute(consumer); 633 } 634 635 private boolean shouldScheduleConsumer() { 636 int currentEpochAndState = epochAndState; 637 if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { 638 return false; 639 } 640 return consumerScheduled.compareAndSet(false, true); 641 } 642 643 @Override 644 protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) 645 throws IOException { 646 long txid = 647 stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); 648 if (shouldScheduleConsumer()) { 649 consumeExecutor.execute(consumer); 650 } 651 return txid; 652 } 653 654 @Override 655 protected void doSync(boolean forceSync) throws IOException { 656 long txid = waitingConsumePayloads.next(); 657 SyncFuture future; 658 try { 659 future = getSyncFuture(txid, forceSync); 660 RingBufferTruck truck = waitingConsumePayloads.get(txid); 661 truck.load(future); 662 } finally { 663 waitingConsumePayloads.publish(txid); 664 } 665 if (shouldScheduleConsumer()) { 666 consumeExecutor.execute(consumer); 667 } 668 blockOnSync(future); 669 } 670 671 @Override 672 protected void doSync(long txid, boolean forceSync) throws IOException { 673 if (highestSyncedTxid.get() >= txid) { 674 return; 675 } 676 // here we do not use ring buffer sequence as txid 677 long sequence = waitingConsumePayloads.next(); 678 SyncFuture future; 679 try { 680 future = getSyncFuture(txid, forceSync); 681 RingBufferTruck truck = waitingConsumePayloads.get(sequence); 682 truck.load(future); 683 } finally { 684 waitingConsumePayloads.publish(sequence); 685 } 686 if (shouldScheduleConsumer()) { 687 consumeExecutor.execute(consumer); 688 } 689 blockOnSync(future); 690 } 691 692 @Override 693 protected AsyncWriter createWriterInstance(Path path) throws IOException { 694 return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, 695 eventLoopGroup, channelClass, streamSlowMonitor); 696 } 697 698 private void waitForSafePoint() { 699 consumeLock.lock(); 700 try { 701 int currentEpochAndState = epochAndState; 702 if (writerBroken(currentEpochAndState) || this.writer == null) { 703 return; 704 } 705 consumerScheduled.set(true); 706 epochAndState = currentEpochAndState | 1; 707 readyForRolling = false; 708 consumeExecutor.execute(consumer); 709 while (!readyForRolling) { 710 readyForRollingCond.awaitUninterruptibly(); 711 } 712 } finally { 713 consumeLock.unlock(); 714 } 715 } 716 717 private void closeWriter(AsyncWriter writer, Path path) { 718 inflightWALClosures.put(path.getName(), writer); 719 closeExecutor.execute(() -> { 720 try { 721 writer.close(); 722 } catch (IOException e) { 723 LOG.warn("close old writer failed", e); 724 } finally { 725 // call this even if the above close fails, as there is no other chance we can set closed to 726 // true, it will not cause big problems. 727 markClosedAndClean(path); 728 inflightWALClosures.remove(path.getName()); 729 } 730 }); 731 } 732 733 @Override 734 protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) 735 throws IOException { 736 Preconditions.checkNotNull(nextWriter); 737 waitForSafePoint(); 738 // we will call rollWriter in init method, where we want to create the first writer and 739 // obviously the previous writer is null, so here we need this null check. And why we must call 740 // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after 741 // closing the writer asynchronously, we need to make sure the WALProps is put into 742 // walFile2Props before we call markClosedAndClean 743 if (writer != null) { 744 long oldFileLen = writer.getLength(); 745 logRollAndSetupWalProps(oldPath, newPath, oldFileLen); 746 closeWriter(writer, oldPath); 747 } else { 748 logRollAndSetupWalProps(oldPath, newPath, 0); 749 } 750 751 this.writer = nextWriter; 752 if (nextWriter instanceof AsyncProtobufLogWriter) { 753 this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); 754 } 755 this.fileLengthAtLastSync = nextWriter.getLength(); 756 this.highestProcessedAppendTxidAtLastSync = 0L; 757 consumeLock.lock(); 758 try { 759 consumerScheduled.set(true); 760 int currentEpoch = epochAndState >>> 2; 761 int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; 762 // set a new epoch and also clear waitingRoll and writerBroken 763 this.epochAndState = nextEpoch << 2; 764 // Reset rollRequested status 765 rollRequested.set(false); 766 consumeExecutor.execute(consumer); 767 } finally { 768 consumeLock.unlock(); 769 } 770 } 771 772 @Override 773 protected void doShutdown() throws IOException { 774 waitForSafePoint(); 775 closeWriter(this.writer, getOldPath()); 776 this.writer = null; 777 closeExecutor.shutdown(); 778 try { 779 if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { 780 LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" 781 + " the close of async writer doesn't complete." 782 + "Please check the status of underlying filesystem" 783 + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS 784 + "\""); 785 } 786 } catch (InterruptedException e) { 787 LOG.error("The wait for close of async writer is interrupted"); 788 Thread.currentThread().interrupt(); 789 } 790 IOException error = new IOException("WAL has been closed"); 791 long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; 792 // drain all the pending sync requests 793 for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor 794 <= cursorBound; nextCursor++) { 795 if (!waitingConsumePayloads.isPublished(nextCursor)) { 796 break; 797 } 798 RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); 799 switch (truck.type()) { 800 case SYNC: 801 syncFutures.add(truck.unloadSync()); 802 break; 803 default: 804 break; 805 } 806 } 807 // and fail them 808 syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); 809 if (!(consumeExecutor instanceof EventLoop)) { 810 consumeExecutor.shutdown(); 811 } 812 } 813 814 @Override 815 protected void doAppend(AsyncWriter writer, FSWALEntry entry) { 816 writer.append(entry); 817 } 818 819 @Override 820 DatanodeInfo[] getPipeline() { 821 AsyncFSOutput output = this.fsOut; 822 return output != null ? output.getPipeline() : new DatanodeInfo[0]; 823 } 824 825 @Override 826 int getLogReplication() { 827 return getPipeline().length; 828 } 829 830 @Override 831 protected boolean doCheckLogLowReplication() { 832 // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so 833 // typically there is no 'low replication' state, only a 'broken' state. 834 AsyncFSOutput output = this.fsOut; 835 return output != null && output.isBroken(); 836 } 837}