001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.UncheckedIOException; 024import java.util.OptionalLong; 025import java.util.concurrent.PriorityBlockingQueue; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader; 031import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 032import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 033import org.apache.hadoop.hbase.util.Pair; 034import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 035import org.apache.hadoop.hbase.wal.WAL.Entry; 036import org.apache.hadoop.hbase.wal.WALFactory; 037import org.apache.hadoop.hbase.wal.WALStreamReader; 038import org.apache.hadoop.hbase.wal.WALTailingReader; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually 046 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it 047 * dequeues it and starts reading from the next. 048 */ 049@InterfaceAudience.Private 050@InterfaceStability.Evolving 051class WALEntryStream implements Closeable { 052 private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); 053 054 private WALTailingReader reader; 055 private WALTailingReader.State state; 056 private Path currentPath; 057 // cache of next entry for hasNext() 058 private Entry currentEntry; 059 // position for the current entry. As now we support peek, which means that the upper layer may 060 // choose to return before reading the current entry, so it is not safe to return the value below 061 // in getPosition. 062 private long currentPositionOfEntry = 0; 063 // position after reading current entry 064 private long currentPositionOfReader = 0; 065 private final ReplicationSourceLogQueue logQueue; 066 private final String walGroupId; 067 private final FileSystem fs; 068 private final Configuration conf; 069 private final WALFileLengthProvider walFileLengthProvider; 070 private final MetricsSource metrics; 071 072 // we should be able to skip empty WAL files, but for safety, we still provide this config 073 // see HBASE-18137 for more details 074 private boolean eofAutoRecovery; 075 076 /** 077 * Create an entry stream over the given queue at the given start position 078 * @param logQueue the queue of WAL paths 079 * @param conf the {@link Configuration} to use to create {@link WALStreamReader} 080 * for this stream 081 * @param startPosition the position in the first WAL to start reading at 082 * @param walFileLengthProvider provides the length of the WAL file 083 * @param serverName the server name which all WALs belong to 084 * @param metrics the replication metrics 085 */ 086 public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf, 087 long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, 088 String walGroupId) { 089 this.logQueue = logQueue; 090 this.fs = fs; 091 this.conf = conf; 092 this.currentPositionOfEntry = startPosition; 093 this.walFileLengthProvider = walFileLengthProvider; 094 this.metrics = metrics; 095 this.walGroupId = walGroupId; 096 this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); 097 } 098 099 public enum HasNext { 100 /** means there is a new entry and you could use peek or next to get current entry */ 101 YES, 102 /** 103 * means there are something wrong or we have reached EOF of the current file but it is not 104 * closed yet and there is no new file in the replication queue yet, you should sleep a while 105 * and try to call hasNext again 106 */ 107 RETRY, 108 /** 109 * Usually this means we have finished reading a WAL file, and for simplify the implementation 110 * of this class, we just let the upper layer issue a new hasNext call again to open the next 111 * WAL file. 112 */ 113 RETRY_IMMEDIATELY, 114 /** 115 * means there is no new entry and stream is end, the upper layer should close this stream and 116 * release other resources as well 117 */ 118 NO 119 } 120 121 /** 122 * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more 123 * details about the meanings of the return values. 124 * <p/> 125 * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method 126 * returns {@link HasNext#YES}. 127 */ 128 public HasNext hasNext() { 129 if (currentEntry == null) { 130 return tryAdvanceEntry(); 131 } else { 132 return HasNext.YES; 133 } 134 } 135 136 /** 137 * Returns the next WAL entry in this stream but does not advance. 138 * <p/> 139 * Must call {@link #hasNext()} first before calling this method, and if you have already called 140 * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to 141 * advance the stream before calling this method again, otherwise it will always return 142 * {@code null} 143 * <p/> 144 * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper 145 * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or 146 * {@link #next()} as they have their own return value. 147 * @see #hasNext() 148 * @see #next() 149 */ 150 public Entry peek() { 151 return currentEntry; 152 } 153 154 /** 155 * Returns the next WAL entry in this stream and advance the stream. Will throw 156 * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method. 157 * Please see the javadoc of {@link #peek()} method to see why we need this. 158 * @throws IllegalStateException Every time you want to call this method, please call 159 * {@link #hasNext()} first, otherwise a 160 * {@link IllegalStateException} will be thrown. 161 * @see #hasNext() 162 * @see #peek() 163 */ 164 public Entry next() { 165 if (currentEntry == null) { 166 throw new IllegalStateException("Call hasNext first"); 167 } 168 Entry save = peek(); 169 currentPositionOfEntry = currentPositionOfReader; 170 currentEntry = null; 171 state = null; 172 return save; 173 } 174 175 /** 176 * {@inheritDoc} 177 */ 178 @Override 179 public void close() { 180 closeReader(); 181 } 182 183 /** Returns the position of the last Entry returned by next() */ 184 public long getPosition() { 185 return currentPositionOfEntry; 186 } 187 188 /** Returns the {@link Path} of the current WAL */ 189 public Path getCurrentPath() { 190 return currentPath; 191 } 192 193 private String getCurrentPathStat() { 194 StringBuilder sb = new StringBuilder(); 195 if (currentPath != null) { 196 sb.append("currently replicating from: ").append(currentPath).append(" at position: ") 197 .append(currentPositionOfEntry).append("\n"); 198 } else { 199 sb.append("no replication ongoing, waiting for new log"); 200 } 201 return sb.toString(); 202 } 203 204 private void setCurrentPath(Path path) { 205 this.currentPath = path; 206 } 207 208 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 209 justification = "HDFS-4380") 210 private HasNext prepareReader() { 211 if (reader != null) { 212 if (state != null && state != WALTailingReader.State.NORMAL) { 213 // reset before reading 214 LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath, 215 currentPositionOfEntry, state.resetCompression()); 216 try { 217 if (currentPositionOfEntry > 0) { 218 reader.resetTo(currentPositionOfEntry, state.resetCompression()); 219 } else { 220 // we will read from the beginning so we should always clear the compression context 221 reader.resetTo(-1, true); 222 } 223 } catch (FileNotFoundException e) { 224 // For now, this could happen only when reading meta wal for meta replicas. 225 // In this case, raising UncheckedIOException will let the endpoint deal with resetting 226 // the replication source. See HBASE-27871. 227 throw new UncheckedIOException(e); 228 } catch (IOException e) { 229 LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, 230 currentPositionOfEntry, state.resetCompression(), e); 231 // just leave the state as is, and try resetting next time 232 return HasNext.RETRY; 233 } 234 } else { 235 return HasNext.YES; 236 } 237 } 238 // try open next WAL file 239 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 240 Path nextPath = queue.peek(); 241 if (nextPath == null) { 242 LOG.debug("No more WAL files in queue"); 243 // no more files in queue, this could happen for recovered queue, or for a wal group of a 244 // sync replication peer which has already been transited to DA or S. 245 setCurrentPath(null); 246 return HasNext.NO; 247 } 248 setCurrentPath(nextPath); 249 // we need to test this prior to create the reader. If not, it is possible that, while 250 // opening the file, the file is still being written so its header is incomplete and we get 251 // a header EOF, but then while we test whether it is still being written, we have already 252 // flushed the data out and we consider it is not being written, and then we just skip over 253 // file, then we will lose the data written after opening... 254 boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent(); 255 LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath, 256 currentPositionOfEntry, beingWritten); 257 try { 258 reader = WALFactory.createTailingReader(fs, nextPath, conf, 259 currentPositionOfEntry > 0 ? currentPositionOfEntry : -1); 260 return HasNext.YES; 261 } catch (WALHeaderEOFException e) { 262 if (!eofAutoRecovery) { 263 // if we do not enable EOF auto recovery, just let the upper layer retry 264 // the replication will be stuck usually, and need to be fixed manually 265 return HasNext.RETRY; 266 } 267 // we hit EOF while reading the WAL header, usually this means we can just skip over this 268 // file, but we need to be careful that whether this file is still being written, if so we 269 // should retry instead of skipping. 270 LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath, 271 currentPositionOfEntry, e); 272 if (beingWritten) { 273 // just retry as the file is still being written, maybe next time we could read 274 // something 275 return HasNext.RETRY; 276 } else { 277 // the file is not being written so we are safe to just skip over it 278 dequeueCurrentLog(); 279 return HasNext.RETRY_IMMEDIATELY; 280 } 281 } catch (LeaseNotRecoveredException e) { 282 // HBASE-15019 the WAL was not closed due to some hiccup. 283 LOG.warn("Try to recover the WAL lease " + nextPath, e); 284 AbstractFSWALProvider.recoverLease(conf, nextPath); 285 return HasNext.RETRY; 286 } catch (IOException | NullPointerException e) { 287 // For why we need to catch NPE here, see HDFS-4380 for more details 288 LOG.warn("Failed to open WAL reader for path: {}", nextPath, e); 289 return HasNext.RETRY; 290 } 291 } 292 293 private HasNext lastAttempt() { 294 LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath, 295 currentPositionOfEntry, state.resetCompression()); 296 try { 297 reader.resetTo(currentPositionOfEntry, state.resetCompression()); 298 } catch (IOException e) { 299 LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath, 300 currentPositionOfEntry, state.resetCompression(), e); 301 // just leave the state as is, next time we will try to reset it again, but there is a 302 // nasty problem is that, we will still reach here finally and try reset again to see if 303 // the log has been fully replicated, which is redundant, can be optimized later 304 return HasNext.RETRY; 305 } 306 Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); 307 state = pair.getFirst(); 308 // should not be written 309 assert !pair.getSecond(); 310 if (!state.eof()) { 311 // we still have something to read after reopen, so return YES. Or there are something wrong 312 // and we need to retry 313 return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY; 314 } 315 // No data available after reopen 316 if (checkAllBytesParsed()) { 317 // move to the next wal file and read 318 dequeueCurrentLog(); 319 return HasNext.RETRY_IMMEDIATELY; 320 } else { 321 // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from 322 // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION 323 // so when calling tryAdvanceENtry next time we will reset the reader to the beginning 324 // and read. 325 currentPositionOfEntry = 0; 326 currentPositionOfReader = 0; 327 state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION; 328 return HasNext.RETRY; 329 } 330 } 331 332 private HasNext tryAdvanceEntry() { 333 HasNext prepared = prepareReader(); 334 if (prepared != HasNext.YES) { 335 return prepared; 336 } 337 338 Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition(); 339 state = pair.getFirst(); 340 boolean beingWritten = pair.getSecond(); 341 LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state, 342 beingWritten); 343 // The below implementation needs to make sure that when beingWritten == true, we should not 344 // dequeue the current WAL file in logQueue. 345 switch (state) { 346 case NORMAL: 347 // everything is fine, just return 348 return HasNext.YES; 349 case EOF_WITH_TRAILER: 350 // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only 351 // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so 352 // typically if beingWritten == true, we should not reach here, as we need to reopen the 353 // reader after writing the trailer. The only possible way to reach here while beingWritten 354 // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is 355 // still in this map, we will consider it as beingWritten, but actually, here we could make 356 // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing 357 // the current log file is safe. 358 if (beingWritten && logQueue.getQueue(walGroupId).size() <= 1) { 359 // As explained above, if we implement everything correctly, we should not arrive here. 360 // But anyway, even if we reach here due to some code changes in the future, reading 361 // the file again can make sure that we will not accidentally consider the queue as 362 // finished, and since there is a trailer, we will soon consider the file as finished 363 // and move on. 364 LOG.warn( 365 "We have reached the trailer while reading the file '{}' which is currently" 366 + " beingWritten, but it is the last file in log queue {}. This should not happen" 367 + " typically, try to read again so we will not miss anything", 368 currentPath, walGroupId); 369 return HasNext.RETRY; 370 } 371 assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1; 372 // we have reached the trailer, which means this WAL file has been closed cleanly and we 373 // have finished reading it successfully, just move to the next WAL file and let the upper 374 // layer start reading the next WAL file 375 dequeueCurrentLog(); 376 return HasNext.RETRY_IMMEDIATELY; 377 case EOF_AND_RESET: 378 case EOF_AND_RESET_COMPRESSION: 379 if (beingWritten) { 380 // just sleep a bit and retry to see if there are new entries coming since the file is 381 // still being written 382 return HasNext.RETRY; 383 } 384 // no more entries in this log file, and the file is already closed, i.e, rolled 385 // Before dequeuing, we should always get one more attempt at reading. 386 // This is in case more entries came in after we opened the reader, and the log is rolled 387 // while we were reading. See HBASE-6758 388 return lastAttempt(); 389 case ERROR_AND_RESET: 390 case ERROR_AND_RESET_COMPRESSION: 391 // we have meet an error, just sleep a bit and retry again 392 return HasNext.RETRY; 393 default: 394 throw new IllegalArgumentException("Unknown read next result: " + state); 395 } 396 } 397 398 private FileStatus getCurrentPathFileStatus() throws IOException { 399 try { 400 return fs.getFileStatus(currentPath); 401 } catch (FileNotFoundException e) { 402 // try archived path 403 Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf); 404 if (archivedWAL != null) { 405 return fs.getFileStatus(archivedWAL); 406 } else { 407 throw e; 408 } 409 } 410 } 411 412 // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file 413 private boolean checkAllBytesParsed() { 414 // -1 means the wal wasn't closed cleanly. 415 final long trailerSize = currentTrailerSize(); 416 FileStatus stat = null; 417 try { 418 stat = getCurrentPathFileStatus(); 419 } catch (IOException e) { 420 LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", 421 currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e); 422 metrics.incrUnknownFileLengthForClosedWAL(); 423 } 424 // Here we use currentPositionOfReader instead of currentPositionOfEntry. 425 // We only call this method when currentEntry is null so usually they are the same, but there 426 // are two exceptions. One is we have nothing in the file but only a header, in this way 427 // the currentPositionOfEntry will always be 0 since we have no change to update it. The other 428 // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the 429 // last valid entry, and the currentPositionOfReader will usually point to the end of the file. 430 if (stat != null) { 431 if (trailerSize < 0) { 432 if (currentPositionOfReader < stat.getLen()) { 433 final long skippedBytes = stat.getLen() - currentPositionOfReader; 434 // See the commits in HBASE-25924/HBASE-25932 for context. 435 LOG.warn("Reached the end of WAL {}. It was not closed cleanly," 436 + " so we did not parse {} bytes of data.", currentPath, skippedBytes); 437 metrics.incrUncleanlyClosedWALs(); 438 metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); 439 } 440 } else if (currentPositionOfReader + trailerSize < stat.getLen()) { 441 LOG.warn( 442 "Processing end of WAL {} at position {}, which is too far away from" 443 + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", 444 currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); 445 metrics.incrRestartedWALReading(); 446 metrics.incrRepeatedFileBytes(currentPositionOfReader); 447 return false; 448 } 449 } 450 LOG.debug("Reached the end of {} and length of the file is {}", currentPath, 451 stat == null ? "N/A" : stat.getLen()); 452 metrics.incrCompletedWAL(); 453 return true; 454 } 455 456 private void dequeueCurrentLog() { 457 LOG.debug("EOF, closing {}", currentPath); 458 closeReader(); 459 logQueue.remove(walGroupId); 460 setCurrentPath(null); 461 currentPositionOfEntry = 0; 462 state = null; 463 } 464 465 /** 466 * Returns whether the file is opened for writing. 467 */ 468 private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() { 469 OptionalLong fileLength; 470 if (logQueue.getQueueSize(walGroupId) > 1) { 471 // if there are more than one files in queue, although it is possible that we are 472 // still trying to write the trailer of the file and it is not closed yet, we can 473 // make sure that we will not write any WAL entries to it any more, so it is safe 474 // to just let the upper layer try to read the whole file without limit 475 fileLength = OptionalLong.empty(); 476 } else { 477 // if there is only one file in queue, check whether it is still being written to 478 // we must call this before actually reading from the reader, as this method will acquire the 479 // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll, 480 // and before this happens, we could have already finished closing the previous WAL file. If 481 // we do not acquire the rollWriteLock and return whether the current file is being written 482 // to, we may finish reading the previous WAL file and start to read the next one, before it 483 // is enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think 484 // the queue is already ended and quit. See HBASE-28114 and related issues for more details. 485 // in the future, if we want to optimize the logic here, for example, do not call this method 486 // every time, or do not acquire rollWriteLock in the implementation of this method, we need 487 // to carefully review the optimized implementation 488 fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); 489 } 490 WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1)); 491 long readerPos = readResult.getEntryEndPos(); 492 Entry readEntry = readResult.getEntry(); 493 if (readResult.getState() == WALTailingReader.State.NORMAL) { 494 LOG.trace("reading entry: {} ", readEntry); 495 metrics.incrLogEditsRead(); 496 metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); 497 // record current entry and reader position 498 currentEntry = readResult.getEntry(); 499 this.currentPositionOfReader = readerPos; 500 } else { 501 LOG.trace("reading entry failed with: {}", readResult.getState()); 502 // set current entry to null 503 currentEntry = null; 504 try { 505 this.currentPositionOfReader = reader.getPosition(); 506 } catch (IOException e) { 507 LOG.warn("failed to get current position of reader", e); 508 if (readResult.getState().resetCompression()) { 509 return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION, 510 fileLength.isPresent()); 511 } 512 } 513 } 514 return Pair.newPair(readResult.getState(), fileLength.isPresent()); 515 } 516 517 private void closeReader() { 518 if (reader != null) { 519 reader.close(); 520 reader = null; 521 } 522 } 523 524 private long currentTrailerSize() { 525 long size = -1L; 526 if (reader instanceof AbstractProtobufWALReader) { 527 final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader; 528 size = pbwr.trailerSize(); 529 } 530 return size; 531 } 532}