001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.UncheckedIOException;
024import java.util.OptionalLong;
025import java.util.concurrent.PriorityBlockingQueue;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
031import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
032import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
035import org.apache.hadoop.hbase.wal.WAL.Entry;
036import org.apache.hadoop.hbase.wal.WALFactory;
037import org.apache.hadoop.hbase.wal.WALStreamReader;
038import org.apache.hadoop.hbase.wal.WALTailingReader;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.yetus.audience.InterfaceStability;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
046 * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
047 * dequeues it and starts reading from the next.
048 */
049@InterfaceAudience.Private
050@InterfaceStability.Evolving
051class WALEntryStream implements Closeable {
052  private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
053
054  private WALTailingReader reader;
055  private WALTailingReader.State state;
056  private Path currentPath;
057  // cache of next entry for hasNext()
058  private Entry currentEntry;
059  // position for the current entry. As now we support peek, which means that the upper layer may
060  // choose to return before reading the current entry, so it is not safe to return the value below
061  // in getPosition.
062  private long currentPositionOfEntry = 0;
063  // position after reading current entry
064  private long currentPositionOfReader = 0;
065  private final ReplicationSourceLogQueue logQueue;
066  private final String walGroupId;
067  private final FileSystem fs;
068  private final Configuration conf;
069  private final WALFileLengthProvider walFileLengthProvider;
070  private final MetricsSource metrics;
071
072  // we should be able to skip empty WAL files, but for safety, we still provide this config
073  // see HBASE-18137 for more details
074  private boolean eofAutoRecovery;
075
076  /**
077   * Create an entry stream over the given queue at the given start position
078   * @param logQueue              the queue of WAL paths
079   * @param conf                  the {@link Configuration} to use to create {@link WALStreamReader}
080   *                              for this stream
081   * @param startPosition         the position in the first WAL to start reading at
082   * @param walFileLengthProvider provides the length of the WAL file
083   * @param serverName            the server name which all WALs belong to
084   * @param metrics               the replication metrics
085   */
086  public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
087    long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
088    String walGroupId) {
089    this.logQueue = logQueue;
090    this.fs = fs;
091    this.conf = conf;
092    this.currentPositionOfEntry = startPosition;
093    this.walFileLengthProvider = walFileLengthProvider;
094    this.metrics = metrics;
095    this.walGroupId = walGroupId;
096    this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
097  }
098
099  public enum HasNext {
100    /** means there is a new entry and you could use peek or next to get current entry */
101    YES,
102    /**
103     * means there are something wrong or we have reached EOF of the current file but it is not
104     * closed yet and there is no new file in the replication queue yet, you should sleep a while
105     * and try to call hasNext again
106     */
107    RETRY,
108    /**
109     * Usually this means we have finished reading a WAL file, and for simplify the implementation
110     * of this class, we just let the upper layer issue a new hasNext call again to open the next
111     * WAL file.
112     */
113    RETRY_IMMEDIATELY,
114    /**
115     * means there is no new entry and stream is end, the upper layer should close this stream and
116     * release other resources as well
117     */
118    NO
119  }
120
121  /**
122   * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more
123   * details about the meanings of the return values.
124   * <p/>
125   * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method
126   * returns {@link HasNext#YES}.
127   */
128  public HasNext hasNext() {
129    if (currentEntry == null) {
130      return tryAdvanceEntry();
131    } else {
132      return HasNext.YES;
133    }
134  }
135
136  /**
137   * Returns the next WAL entry in this stream but does not advance.
138   * <p/>
139   * Must call {@link #hasNext()} first before calling this method, and if you have already called
140   * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to
141   * advance the stream before calling this method again, otherwise it will always return
142   * {@code null}
143   * <p/>
144   * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper
145   * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or
146   * {@link #next()} as they have their own return value.
147   * @see #hasNext()
148   * @see #next()
149   */
150  public Entry peek() {
151    return currentEntry;
152  }
153
154  /**
155   * Returns the next WAL entry in this stream and advance the stream. Will throw
156   * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method.
157   * Please see the javadoc of {@link #peek()} method to see why we need this.
158   * @throws IllegalStateException Every time you want to call this method, please call
159   *                               {@link #hasNext()} first, otherwise a
160   *                               {@link IllegalStateException} will be thrown.
161   * @see #hasNext()
162   * @see #peek()
163   */
164  public Entry next() {
165    if (currentEntry == null) {
166      throw new IllegalStateException("Call hasNext first");
167    }
168    Entry save = peek();
169    currentPositionOfEntry = currentPositionOfReader;
170    currentEntry = null;
171    state = null;
172    return save;
173  }
174
175  /**
176   * {@inheritDoc}
177   */
178  @Override
179  public void close() {
180    closeReader();
181  }
182
183  /** Returns the position of the last Entry returned by next() */
184  public long getPosition() {
185    return currentPositionOfEntry;
186  }
187
188  /** Returns the {@link Path} of the current WAL */
189  public Path getCurrentPath() {
190    return currentPath;
191  }
192
193  private String getCurrentPathStat() {
194    StringBuilder sb = new StringBuilder();
195    if (currentPath != null) {
196      sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
197        .append(currentPositionOfEntry).append("\n");
198    } else {
199      sb.append("no replication ongoing, waiting for new log");
200    }
201    return sb.toString();
202  }
203
204  private void setCurrentPath(Path path) {
205    this.currentPath = path;
206  }
207
208  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
209      justification = "HDFS-4380")
210  private HasNext prepareReader() {
211    if (reader != null) {
212      if (state != null && state != WALTailingReader.State.NORMAL) {
213        // reset before reading
214        LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
215          currentPositionOfEntry, state.resetCompression());
216        try {
217          if (currentPositionOfEntry > 0) {
218            reader.resetTo(currentPositionOfEntry, state.resetCompression());
219          } else {
220            // we will read from the beginning so we should always clear the compression context
221            reader.resetTo(-1, true);
222          }
223        } catch (FileNotFoundException e) {
224          // For now, this could happen only when reading meta wal for meta replicas.
225          // In this case, raising UncheckedIOException will let the endpoint deal with resetting
226          // the replication source. See HBASE-27871.
227          throw new UncheckedIOException(e);
228        } catch (IOException e) {
229          LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
230            currentPositionOfEntry, state.resetCompression(), e);
231          // just leave the state as is, and try resetting next time
232          return HasNext.RETRY;
233        }
234      } else {
235        return HasNext.YES;
236      }
237    }
238    // try open next WAL file
239    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
240    Path nextPath = queue.peek();
241    if (nextPath == null) {
242      LOG.debug("No more WAL files in queue");
243      // no more files in queue, this could happen for recovered queue, or for a wal group of a
244      // sync replication peer which has already been transited to DA or S.
245      setCurrentPath(null);
246      return HasNext.NO;
247    }
248    setCurrentPath(nextPath);
249    // we need to test this prior to create the reader. If not, it is possible that, while
250    // opening the file, the file is still being written so its header is incomplete and we get
251    // a header EOF, but then while we test whether it is still being written, we have already
252    // flushed the data out and we consider it is not being written, and then we just skip over
253    // file, then we will lose the data written after opening...
254    boolean beingWritten = walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
255    LOG.debug("Creating new reader {}, startPosition={}, beingWritten={}", nextPath,
256      currentPositionOfEntry, beingWritten);
257    try {
258      reader = WALFactory.createTailingReader(fs, nextPath, conf,
259        currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
260      return HasNext.YES;
261    } catch (WALHeaderEOFException e) {
262      if (!eofAutoRecovery) {
263        // if we do not enable EOF auto recovery, just let the upper layer retry
264        // the replication will be stuck usually, and need to be fixed manually
265        return HasNext.RETRY;
266      }
267      // we hit EOF while reading the WAL header, usually this means we can just skip over this
268      // file, but we need to be careful that whether this file is still being written, if so we
269      // should retry instead of skipping.
270      LOG.warn("EOF while trying to open WAL reader for path: {}, startPosition={}", nextPath,
271        currentPositionOfEntry, e);
272      if (beingWritten) {
273        // just retry as the file is still being written, maybe next time we could read
274        // something
275        return HasNext.RETRY;
276      } else {
277        // the file is not being written so we are safe to just skip over it
278        dequeueCurrentLog();
279        return HasNext.RETRY_IMMEDIATELY;
280      }
281    } catch (LeaseNotRecoveredException e) {
282      // HBASE-15019 the WAL was not closed due to some hiccup.
283      LOG.warn("Try to recover the WAL lease " + nextPath, e);
284      AbstractFSWALProvider.recoverLease(conf, nextPath);
285      return HasNext.RETRY;
286    } catch (IOException | NullPointerException e) {
287      // For why we need to catch NPE here, see HDFS-4380 for more details
288      LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
289      return HasNext.RETRY;
290    }
291  }
292
293  private HasNext lastAttempt() {
294    LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
295      currentPositionOfEntry, state.resetCompression());
296    try {
297      reader.resetTo(currentPositionOfEntry, state.resetCompression());
298    } catch (IOException e) {
299      LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
300        currentPositionOfEntry, state.resetCompression(), e);
301      // just leave the state as is, next time we will try to reset it again, but there is a
302      // nasty problem is that, we will still reach here finally and try reset again to see if
303      // the log has been fully replicated, which is redundant, can be optimized later
304      return HasNext.RETRY;
305    }
306    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
307    state = pair.getFirst();
308    // should not be written
309    assert !pair.getSecond();
310    if (!state.eof()) {
311      // we still have something to read after reopen, so return YES. Or there are something wrong
312      // and we need to retry
313      return state == WALTailingReader.State.NORMAL ? HasNext.YES : HasNext.RETRY;
314    }
315    // No data available after reopen
316    if (checkAllBytesParsed()) {
317      // move to the next wal file and read
318      dequeueCurrentLog();
319      return HasNext.RETRY_IMMEDIATELY;
320    } else {
321      // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
322      // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
323      // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
324      // and read.
325      currentPositionOfEntry = 0;
326      currentPositionOfReader = 0;
327      state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
328      return HasNext.RETRY;
329    }
330  }
331
332  private HasNext tryAdvanceEntry() {
333    HasNext prepared = prepareReader();
334    if (prepared != HasNext.YES) {
335      return prepared;
336    }
337
338    Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
339    state = pair.getFirst();
340    boolean beingWritten = pair.getSecond();
341    LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state,
342      beingWritten);
343    // The below implementation needs to make sure that when beingWritten == true, we should not
344    // dequeue the current WAL file in logQueue.
345    switch (state) {
346      case NORMAL:
347        // everything is fine, just return
348        return HasNext.YES;
349      case EOF_WITH_TRAILER:
350        // in readNextEntryAndRecordReaderPosition, we will acquire rollWriteLock, and we can only
351        // schedule a close writer task, in which we will write trailer, under the rollWriteLock, so
352        // typically if beingWritten == true, we should not reach here, as we need to reopen the
353        // reader after writing the trailer. The only possible way to reach here while beingWritten
354        // == true is due to the inflightWALClosures logic in AbstractFSWAL, as if the writer is
355        // still in this map, we will consider it as beingWritten, but actually, here we could make
356        // sure that the new WAL file has already been enqueued into the logQueue, so here dequeuing
357        // the current log file is safe.
358        if (beingWritten && logQueue.getQueue(walGroupId).size() <= 1) {
359          // As explained above, if we implement everything correctly, we should not arrive here.
360          // But anyway, even if we reach here due to some code changes in the future, reading
361          // the file again can make sure that we will not accidentally consider the queue as
362          // finished, and since there is a trailer, we will soon consider the file as finished
363          // and move on.
364          LOG.warn(
365            "We have reached the trailer while reading the file '{}' which is currently"
366              + " beingWritten, but it is the last file in log queue {}. This should not happen"
367              + " typically, try to read again so we will not miss anything",
368            currentPath, walGroupId);
369          return HasNext.RETRY;
370        }
371        assert !beingWritten || logQueue.getQueue(walGroupId).size() > 1;
372        // we have reached the trailer, which means this WAL file has been closed cleanly and we
373        // have finished reading it successfully, just move to the next WAL file and let the upper
374        // layer start reading the next WAL file
375        dequeueCurrentLog();
376        return HasNext.RETRY_IMMEDIATELY;
377      case EOF_AND_RESET:
378      case EOF_AND_RESET_COMPRESSION:
379        if (beingWritten) {
380          // just sleep a bit and retry to see if there are new entries coming since the file is
381          // still being written
382          return HasNext.RETRY;
383        }
384        // no more entries in this log file, and the file is already closed, i.e, rolled
385        // Before dequeuing, we should always get one more attempt at reading.
386        // This is in case more entries came in after we opened the reader, and the log is rolled
387        // while we were reading. See HBASE-6758
388        return lastAttempt();
389      case ERROR_AND_RESET:
390      case ERROR_AND_RESET_COMPRESSION:
391        // we have meet an error, just sleep a bit and retry again
392        return HasNext.RETRY;
393      default:
394        throw new IllegalArgumentException("Unknown read next result: " + state);
395    }
396  }
397
398  private FileStatus getCurrentPathFileStatus() throws IOException {
399    try {
400      return fs.getFileStatus(currentPath);
401    } catch (FileNotFoundException e) {
402      // try archived path
403      Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
404      if (archivedWAL != null) {
405        return fs.getFileStatus(archivedWAL);
406      } else {
407        throw e;
408      }
409    }
410  }
411
412  // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
413  private boolean checkAllBytesParsed() {
414    // -1 means the wal wasn't closed cleanly.
415    final long trailerSize = currentTrailerSize();
416    FileStatus stat = null;
417    try {
418      stat = getCurrentPathFileStatus();
419    } catch (IOException e) {
420      LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
421        currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e);
422      metrics.incrUnknownFileLengthForClosedWAL();
423    }
424    // Here we use currentPositionOfReader instead of currentPositionOfEntry.
425    // We only call this method when currentEntry is null so usually they are the same, but there
426    // are two exceptions. One is we have nothing in the file but only a header, in this way
427    // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
428    // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
429    // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
430    if (stat != null) {
431      if (trailerSize < 0) {
432        if (currentPositionOfReader < stat.getLen()) {
433          final long skippedBytes = stat.getLen() - currentPositionOfReader;
434          // See the commits in HBASE-25924/HBASE-25932 for context.
435          LOG.warn("Reached the end of WAL {}. It was not closed cleanly,"
436            + " so we did not parse {} bytes of data.", currentPath, skippedBytes);
437          metrics.incrUncleanlyClosedWALs();
438          metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
439        }
440      } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
441        LOG.warn(
442          "Processing end of WAL {} at position {}, which is too far away from"
443            + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
444          currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
445        metrics.incrRestartedWALReading();
446        metrics.incrRepeatedFileBytes(currentPositionOfReader);
447        return false;
448      }
449    }
450    LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
451      stat == null ? "N/A" : stat.getLen());
452    metrics.incrCompletedWAL();
453    return true;
454  }
455
456  private void dequeueCurrentLog() {
457    LOG.debug("EOF, closing {}", currentPath);
458    closeReader();
459    logQueue.remove(walGroupId);
460    setCurrentPath(null);
461    currentPositionOfEntry = 0;
462    state = null;
463  }
464
465  /**
466   * Returns whether the file is opened for writing.
467   */
468  private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() {
469    OptionalLong fileLength;
470    if (logQueue.getQueueSize(walGroupId) > 1) {
471      // if there are more than one files in queue, although it is possible that we are
472      // still trying to write the trailer of the file and it is not closed yet, we can
473      // make sure that we will not write any WAL entries to it any more, so it is safe
474      // to just let the upper layer try to read the whole file without limit
475      fileLength = OptionalLong.empty();
476    } else {
477      // if there is only one file in queue, check whether it is still being written to
478      // we must call this before actually reading from the reader, as this method will acquire the
479      // rollWriteLock. This is very important, as we will enqueue the new WAL file in postLogRoll,
480      // and before this happens, we could have already finished closing the previous WAL file. If
481      // we do not acquire the rollWriteLock and return whether the current file is being written
482      // to, we may finish reading the previous WAL file and start to read the next one, before it
483      // is enqueued into the logQueue, thus lead to an empty logQueue and make the shipper think
484      // the queue is already ended and quit. See HBASE-28114 and related issues for more details.
485      // in the future, if we want to optimize the logic here, for example, do not call this method
486      // every time, or do not acquire rollWriteLock in the implementation of this method, we need
487      // to carefully review the optimized implementation
488      fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
489    }
490    WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1));
491    long readerPos = readResult.getEntryEndPos();
492    Entry readEntry = readResult.getEntry();
493    if (readResult.getState() == WALTailingReader.State.NORMAL) {
494      LOG.trace("reading entry: {} ", readEntry);
495      metrics.incrLogEditsRead();
496      metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
497      // record current entry and reader position
498      currentEntry = readResult.getEntry();
499      this.currentPositionOfReader = readerPos;
500    } else {
501      LOG.trace("reading entry failed with: {}", readResult.getState());
502      // set current entry to null
503      currentEntry = null;
504      try {
505        this.currentPositionOfReader = reader.getPosition();
506      } catch (IOException e) {
507        LOG.warn("failed to get current position of reader", e);
508        if (readResult.getState().resetCompression()) {
509          return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION,
510            fileLength.isPresent());
511        }
512      }
513    }
514    return Pair.newPair(readResult.getState(), fileLength.isPresent());
515  }
516
517  private void closeReader() {
518    if (reader != null) {
519      reader.close();
520      reader = null;
521    }
522  }
523
524  private long currentTrailerSize() {
525    long size = -1L;
526    if (reader instanceof AbstractProtobufWALReader) {
527      final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader;
528      size = pbwr.trailerSize();
529    }
530    return size;
531  }
532}