001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.util.OptionalLong;
024import java.util.concurrent.PriorityBlockingQueue;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.FileStatus;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
030import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
031import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.wal.WALFactory;
036import org.apache.hadoop.hbase.wal.WALStreamReader;
037import org.apache.hadoop.hbase.wal.WALTailingReader;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
045 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
046 * dequeues it and starts reading from the next.
047 */
048@InterfaceAudience.Private
049@InterfaceStability.Evolving
050class WALEntryStream implements Closeable {
051  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
052
053  private WALTailingReader reader;
054  private WALTailingReader.State state;
055  private Path currentPath;
056  // cache of next entry for hasNext()
057  private Entry currentEntry;
058  // position for the current entry. As now we support peek, which means that the upper layer may
059  // choose to return before reading the current entry, so it is not safe to return the value below
060  // in getPosition.
061  private long currentPositionOfEntry = 0;
062  // position after reading current entry
063  private long currentPositionOfReader = 0;
064  private final ReplicationSourceLogQueue logQueue;
065  private final String walGroupId;
066  private final FileSystem fs;
067  private final Configuration conf;
068  private final WALFileLengthProvider walFileLengthProvider;
069  private final MetricsSource metrics;
070
071  // we should be able to skip empty WAL files, but for safety, we still provide this config
072  // see HBASE-18137 for more details
073  private boolean eofAutoRecovery;
074
075  /**
076   * Create an entry stream over the given queue at the given start position
077   * @param logQueue              the queue of WAL paths
078   * @param conf                  the {@link Configuration} to use to create {@link WALStreamReader}
079   *                              for this stream
080   * @param startPosition         the position in the first WAL to start reading at
081   * @param walFileLengthProvider provides the length of the WAL file
082   * @param serverName            the server name which all WALs belong to
083   * @param metrics               the replication metrics
084   */
085  public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
086    long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
087    String walGroupId) {
088    this.logQueue = logQueue;
089    this.fs = fs;
090    this.conf = conf;
091    this.currentPositionOfEntry = startPosition;
092    this.walFileLengthProvider = walFileLengthProvider;
093    this.metrics = metrics;
094    this.walGroupId = walGroupId;
095    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
096  }
097
098  public enum HasNext {
099    /** means there is a new entry and you could use peek or next to get current entry */
100    YES,
101    /**
102     * means there are something wrong or we have reached EOF of the current file but it is not
103     * closed yet and there is no new file in the replication queue yet, you should sleep a while
104     * and try to call hasNext again
105     */
106    RETRY,
107    /**
108     * Usually this means we have finished reading a WAL file, and for simplify the implementation
109     * of this class, we just let the upper layer issue a new hasNext call again to open the next
110     * WAL file.
111     */
112    RETRY_IMMEDIATELY,
113    /**
114     * means there is no new entry and stream is end, the upper layer should close this stream and
115     * release other resources as well
116     */
117    NO
118  }
119
120  /**
121   * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more
122   * details about the meanings of the return values.
123   * <p/>
124   * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method
125   * returns {@link HasNext#YES}.
126   */
127  public HasNext hasNext() {
128    if (currentEntry == null) {
129      return tryAdvanceEntry();
130    } else {
131      return HasNext.YES;
132    }
133  }
134
135  /**
136   * Returns the next WAL entry in this stream but does not advance.
137   * <p/>
138   * Must call {@link #hasNext()} first before calling this method, and if you have already called
139   * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to
140   * advance the stream before calling this method again, otherwise it will always return
141   * {@code null}
142   * <p/>
143   * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper
144   * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or
145   * {@link #next()} as they have their own return value.
146   * @see #hasNext()
147   * @see #next()
148   */
149  public Entry peek() {
150    return currentEntry;
151  }
152
153  /**
154   * Returns the next WAL entry in this stream and advance the stream. Will throw
155   * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method.
156   * Please see the javadoc of {@link #peek()} method to see why we need this.
157   * @throws IllegalStateException Every time you want to call this method, please call
158   *                               {@link #hasNext()} first, otherwise a
159   *                               {@link IllegalStateException} will be thrown.
160   * @see #hasNext()
161   * @see #peek()
162   */
163  public Entry next() {
164    if (currentEntry == null) {
165      throw new IllegalStateException("Call hasNext first");
166    }
167    Entry save = peek();
168    currentPositionOfEntry = currentPositionOfReader;
169    currentEntry = null;
170    state = null;
171    return save;
172  }
173
174  /**
175   * {@inheritDoc}
176   */
177  @Override
178  public void close() {
179    closeReader();
180  }
181
182  /** Returns the position of the last Entry returned by next() */
183  public long getPosition() {
184    return currentPositionOfEntry;
185  }
186
187  /** Returns the {@link Path} of the current WAL */
188  public Path getCurrentPath() {
189    return currentPath;
190  }
191
192  private String getCurrentPathStat() {
193    StringBuilder sb = new StringBuilder();
194    if (currentPath != null) {
195      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
196        .append(currentPositionOfEntry).append("\n");
197    } else {
198      sb.append("no replication ongoing, waiting for new log");
199    }
200    return sb.toString();
201  }
202
203  private void setCurrentPath(Path path) {
204    this.currentPath = path;
205  }
206
207  private void resetReader() throws IOException {
208    if (currentPositionOfEntry > 0) {
209      reader.resetTo(currentPositionOfEntry, state.resetCompression());
210    } else {
211      // we will read from the beginning so we should always clear the compression context
212      reader.resetTo(-1, true);
213    }
214  }
215
216  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
217      justification = "HDFS-4380")
218  private HasNext prepareReader() {
219    if (reader != null) {
220      if (state != null && state != WALTailingReader.State.NORMAL) {
221        // reset before reading
222        LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
223          currentPositionOfEntry, state.resetCompression());
224        try {
225          resetReader();
226          return HasNext.YES;
227        } catch (IOException e) {
228          LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
229            currentPositionOfEntry, state.resetCompression(), e);
230          // just leave the state as is, and try resetting next time
231          return HasNext.RETRY;
232        }
233      } else {
234        return HasNext.YES;
235      }
236    }
237    // try open next WAL file
238    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
239    Path nextPath = queue.peek();
240    if (nextPath == null) {
241      LOG.debug("No more WAL files in queue");
242      // no more files in queue, this could happen for recovered queue, or for a wal group of a
243      // sync replication peer which has already been transited to DA or S.
244      setCurrentPath(null);
245      return HasNext.NO;
246    }
247    setCurrentPath(nextPath);
248    // we need to test this prior to create the reader. If not, it is possible that, while
249    // opening the file, the file is still being written so its header is incomplete and we get
250    // a header EOF, but then while we test whether it is still being written, we have already
251    // flushed the data out and we consider it is not being written, and then we just skip over
252    // file, then we will lose the data written after opening...
253    boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
254    LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
255      currentPositionOfEntry, beingWritten);
256    try {
257      reader = WALFactory.createTailingReader(fs, nextPath, conf,
258        currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
259      return HasNext.YES;
260    } catch (WALHeaderEOFException e) {
261      if (!eofAutoRecovery) {
262        // if we do not enable EOF auto recovery, just let the upper layer retry
263        // the replication will be stuck usually, and need to be fixed manually
264        return HasNext.RETRY;
265      }
266      // we hit EOF while reading the WAL header, usually this means we can just skip over this
267      // file, but we need to be careful that whether this file is still being written, if so we
268      // should retry instead of skipping.
269      LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
270        currentPositionOfEntry, e);
271      if (beingWritten) {
272        // just retry as the file is still being written, maybe next time we could read
273        // something
274        return HasNext.RETRY;
275      } else {
276        // the file is not being written so we are safe to just skip over it
277        dequeueCurrentLog();
278        return HasNext.RETRY_IMMEDIATELY;
279      }
280    } catch (LeaseNotRecoveredException e) {
281      // HBASE-15019 the WAL was not closed due to some hiccup.
282      LOG.warn("Try to recover the WAL lease " + nextPath, e);
283      AbstractFSWALProvider.recoverLease(conf, nextPath);
284      return HasNext.RETRY;
285    } catch (IOException | NullPointerException e) {
286      // For why we need to catch NPE here, see HDFS-4380 for more details
287      LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
288      return HasNext.RETRY;
289    }
290  }
291
292  private HasNext lastAttempt() {
293    LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
294      currentPositionOfEntry, state.resetCompression());
295    try {
296      resetReader();
297    } catch (IOException e) {
298      LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
299        currentPositionOfEntry, state.resetCompression(), e);
300      // just leave the state as is, next time we will try to reset it again, but there is a
301      // nasty problem is that, we will still reach here finally and try reset again to see if
302      // the log has been fully replicated, which is redundant, can be optimized later
303      return HasNext.RETRY;
304    }
305    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
306    state = pair.getFirst();
307    // should not be written
308    assert !pair.getSecond();
309    if (!state.eof()) {
310      // we still have something to read after reopen, so return YES. Or there are something wrong
311      // and we need to retry
312      return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
313    }
314    // No data available after reopen
315    if (checkAllBytesParsed()) {
316      // move to the next wal file and read
317      dequeueCurrentLog();
318      return HasNext.RETRY_IMMEDIATELY;
319    } else {
320      // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
321      // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
322      // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
323      // and read.
324      currentPositionOfEntry = 0;
325      currentPositionOfReader = 0;
326      state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
327      return HasNext.RETRY;
328    }
329  }
330
331  private HasNext tryAdvanceEntry() {
332    HasNext prepared = prepareReader();
333    if (prepared != HasNext.YES) {
334      return prepared;
335    }
336
337    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
338    state = pair.getFirst();
339    boolean beingWritten = pair.getSecond();
340    LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state,
341      beingWritten);
342    // The below implementation needs to make sure that when beingWritten == true, we should not
343    // dequeue the current WAL file in logQueue.
344    switch (state) {
345      case NORMAL:
346        // everything is fine, just return
347        return HasNext.YES;
348      case EOF_WITH_TRAILER:
349        // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only
350        // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so
351        // typically if beingWritten == true, we should not reach here, as we need to reopen the
352        // reader after writing the trailer. The only possible way to reach here while beingWritten
353        // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is
354        // still in this map, we will consider it as beingWritten, but actually, here we could make
355        // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing
356        // the current log file is safe.
357        if (beingWritten && logQueue.getQueue(walGroupId).size() <= 1) {
358          // As explained above, if we implement everything correctly, we should not arrive here.
359          // But anyway, even if we reach here due to some code changes in the future, reading
360          // the file again can make sure that we will not accidentally consider the queue as
361          // finished, and since there is a trailer, we will soon consider the file as finished
362          // and move on.
363          LOG.warn(
364            "We have reached the trailer while reading the file '{}' which is currently"
365              + " beingWritten, but it is the last file in log queue {}. This should not happen"
366              + " typically, try to read again so we will not miss anything",
367            currentPath, walGroupId);
368          return HasNext.RETRY;
369        }
370        assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1;
371        // we have reached the trailer, which means this WAL file has been closed cleanly and we
372        // have finished reading it successfully, just move to the next WAL file and let the upper
373        // layer start reading the next WAL file
374        dequeueCurrentLog();
375        return HasNext.RETRY_IMMEDIATELY;
376      case EOF_AND_RESET:
377      case EOF_AND_RESET_COMPRESSION:
378        if (beingWritten) {
379          // just sleep a bit and retry to see if there are new entries coming since the file is
380          // still being written
381          return HasNext.RETRY;
382        }
383        // no more entries in this log file, and the file is already closed, i.e, rolled
384        // Before dequeuing, we should always get one more attempt at reading.
385        // This is in case more entries came in after we opened the reader, and the log is rolled
386        // while we were reading. See HBASE-6758
387        return lastAttempt();
388      case ERROR_AND_RESET:
389      case ERROR_AND_RESET_COMPRESSION:
390        // we have meet an error, just sleep a bit and retry again
391        return HasNext.RETRY;
392      default:
393        throw new IllegalArgumentException("Unknown read next result: " + state);
394    }
395  }
396
397  private FileStatus getCurrentPathFileStatus() throws IOException {
398    try {
399      return fs.getFileStatus(currentPath);
400    } catch (FileNotFoundException e) {
401      // try archived path
402      Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
403      if (archivedWAL != null) {
404        return fs.getFileStatus(archivedWAL);
405      } else {
406        throw e;
407      }
408    }
409  }
410
411  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
412  private boolean checkAllBytesParsed() {
413    // -1 means the wal wasn't closed cleanly.
414    final long trailerSize = currentTrailerSize();
415    FileStatus stat = null;
416    try {
417      stat = getCurrentPathFileStatus();
418    } catch (IOException e) {
419      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
420        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e);
421      metrics.incrUnknownFileLengthForClosedWAL();
422    }
423    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
424    // We only call this method when currentEntry is null so usually they are the same, but there
425    // are two exceptions. One is we have nothing in the file but only a header, in this way
426    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
427    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
428    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
429    if (stat != null) {
430      if (trailerSize < 0) {
431        if (currentPositionOfReader < stat.getLen()) {
432          final long skippedBytes = stat.getLen() - currentPositionOfReader;
433          // See the commits in HBASE-25924/HBASE-25932 for context.
434          LOG.warn("Reached the end of WAL {}. It was not closed cleanly,"
435            + " so we did not parse {} bytes of data.", currentPath, skippedBytes);
436          metrics.incrUncleanlyClosedWALs();
437          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
438        }
439      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
440        LOG.warn(
441          "Processing end of WAL {} at position {}, which is too far away from"
442            + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
443          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
444        metrics.incrRestartedWALReading();
445        metrics.incrRepeatedFileBytes(currentPositionOfReader);
446        return false;
447      }
448    }
449    LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
450      stat == null ? "N/A" : stat.getLen());
451    metrics.incrCompletedWAL();
452    return true;
453  }
454
455  private void dequeueCurrentLog() {
456    LOG.debug("EOF, closing {}", currentPath);
457    closeReader();
458    logQueue.remove(walGroupId);
459    setCurrentPath(null);
460    currentPositionOfEntry = 0;
461    state = null;
462  }
463
464  /**
465   * Returns whether the file is opened for writing.
466   */
467  private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() {
468    OptionalLong fileLength;
469    if (logQueue.getQueueSize(walGroupId) > 1) {
470      // if there are more than one files in queue, although it is possible that we are
471      // still trying to write the trailer of the file and it is not closed yet, we can
472      // make sure that we will not write any WAL entries to it any more, so it is safe
473      // to just let the upper layer try to read the whole file without limit
474      fileLength = OptionalLong.empty();
475    } else {
476      // if there is only one file in queue, check whether it is still being written to
477      // we must call this before actually reading from the reader, as this method will acquire the
478      // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll,
479      // and before this happens, we could have already finished closing the previous WAL file. If
480      // we do not acquire the rollWriteLock and return whether the current file is being written
481      // to, we may finish reading the previous WAL file and start to read the next one, before it
482      // is enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think
483      // the queue is already ended and quit. See HBASE-28114 and related issues for more details.
484      // in the future, if we want to optimize the logic here, for example, do not call this method
485      // every time, or do not acquire rollWriteLock in the implementation of this method, we need
486      // to carefully review the optimized implementation
487      fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
488    }
489    WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1));
490    long readerPos = readResult.getEntryEndPos();
491    Entry readEntry = readResult.getEntry();
492    if (readResult.getState() == WALTailingReader.State.NORMAL) {
493      LOG.trace("reading entry: {} ", readEntry);
494      metrics.incrLogEditsRead();
495      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
496      // record current entry and reader position
497      currentEntry = readResult.getEntry();
498      this.currentPositionOfReader = readerPos;
499    } else {
500      LOG.trace("reading entry failed with: {}", readResult.getState());
501      // set current entry to null
502      currentEntry = null;
503      try {
504        this.currentPositionOfReader = reader.getPosition();
505      } catch (IOException e) {
506        LOG.warn("failed to get current position of reader", e);
507        if (readResult.getState().resetCompression()) {
508          return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION,
509            fileLength.isPresent());
510        }
511      }
512    }
513    return Pair.newPair(readResult.getState(), fileLength.isPresent());
514  }
515
516  private void closeReader() {
517    if (reader != null) {
518      reader.close();
519      reader = null;
520    }
521  }
522
523  private long currentTrailerSize() {
524    long size = -1L;
525    if (reader instanceof AbstractProtobufWALReader) {
526      final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader;
527      size = pbwr.trailerSize();
528    }
529    return size;
530  }
531}