001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
022
023import java.io.EOFException;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.log.HBaseMarkers;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.ipc.RemoteException;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
042
043@InterfaceAudience.Private
044abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
045  private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
046  private final WALSplitter walSplitter;
047  private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>();
048
049  public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
050    WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
051    super(controller, entryBuffers, numWriters);
052    this.walSplitter = walSplitter;
053  }
054
055  /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */
056  protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
057    long seqId) throws IOException {
058    Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
059      walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
060      walSplitter.conf);
061    if (walSplitter.walFS.exists(regionEditsPath)) {
062      LOG.warn("Found old edits file. It could be the "
063        + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length="
064        + walSplitter.walFS.getFileStatus(regionEditsPath).getLen());
065      if (!walSplitter.walFS.delete(regionEditsPath, false)) {
066        LOG.warn("Failed delete of old {}", regionEditsPath);
067      }
068    }
069    WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
070    final String msg = "Creating recovered edits writer path=" + regionEditsPath;
071    LOG.info(msg);
072    updateStatusWithMsg(msg);
073    return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
074  }
075
076  /**
077   * abortRecoveredEditsWriter closes the editsWriter, but does not rename and finalize the
078   * recovered edits WAL files. Please see HBASE-28569.
079   */
080  protected void abortRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
081    List<IOException> thrown) {
082    closeRecoveredEditsWriter(editsWriter, thrown);
083    try {
084      removeRecoveredEditsFile(editsWriter);
085    } catch (IOException ioe) {
086      final String errorMsg = "Failed removing recovered edits file at " + editsWriter.path;
087      LOG.error(errorMsg);
088      updateStatusWithMsg(errorMsg);
089    }
090  }
091
092  protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter editsWriter,
093    List<IOException> thrown) throws IOException {
094    if (!closeRecoveredEditsWriter(editsWriter, thrown)) {
095      return null;
096    }
097    if (editsWriter.editsWritten == 0) {
098      // just remove the empty recovered.edits file
099      removeRecoveredEditsFile(editsWriter);
100      return null;
101    }
102
103    Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
104      regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
105    try {
106      if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
107        deleteOneWithFewerEntries(editsWriter, dst);
108      }
109      // Skip the unit tests which create a splitter that reads and
110      // writes the data without touching disk.
111      // TestHLogSplit#testThreading is an example.
112      if (walSplitter.walFS.exists(editsWriter.path)) {
113        if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
114          final String errorMsg =
115            "Failed renaming recovered edits " + editsWriter.path + " to " + dst;
116          updateStatusWithMsg(errorMsg);
117          throw new IOException(errorMsg);
118        }
119        final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst;
120        LOG.info(renameEditMsg);
121        updateStatusWithMsg(renameEditMsg);
122      }
123    } catch (IOException ioe) {
124      final String errorMsg = "Could not rename recovered edits " + editsWriter.path + " to " + dst;
125      LOG.error(errorMsg, ioe);
126      updateStatusWithMsg(errorMsg);
127      thrown.add(ioe);
128      return null;
129    }
130    return dst;
131  }
132
133  private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
134    List<IOException> thrown) {
135    try {
136      editsWriter.writer.close();
137    } catch (IOException ioe) {
138      final String errorMsg = "Could not close recovered edits at " + editsWriter.path;
139      LOG.error(errorMsg, ioe);
140      updateStatusWithMsg(errorMsg);
141      thrown.add(ioe);
142      return false;
143    }
144    final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote "
145      + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in "
146      + (editsWriter.nanosSpent / 1000 / 1000) + " ms)";
147    LOG.info(msg);
148    updateStatusWithMsg(msg);
149    return true;
150  }
151
152  private void removeRecoveredEditsFile(RecoveredEditsWriter editsWriter) throws IOException {
153    if (
154      walSplitter.walFS.exists(editsWriter.path)
155        && !walSplitter.walFS.delete(editsWriter.path, false)
156    ) {
157      final String errorMsg = "Failed deleting empty " + editsWriter.path;
158      LOG.warn(errorMsg);
159      updateStatusWithMsg(errorMsg);
160      throw new IOException("Failed deleting empty  " + editsWriter.path);
161    }
162  }
163
164  @Override
165  public boolean keepRegionEvent(WAL.Entry entry) {
166    ArrayList<Cell> cells = entry.getEdit().getCells();
167    for (Cell cell : cells) {
168      if (WALEdit.isCompactionMarker(cell)) {
169        return true;
170      }
171    }
172    return false;
173  }
174
175  /**
176   * Update region's maximum edit log SeqNum.
177   */
178  void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
179    synchronized (regionMaximumEditLogSeqNum) {
180      String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
181      Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
182      if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
183        regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
184      }
185    }
186  }
187
188  // delete the one with fewer wal entries
189  private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
190    throws IOException {
191    long dstMinLogSeqNum = -1L;
192    try (WALStreamReader reader =
193      walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
194      WAL.Entry entry = reader.next();
195      if (entry != null) {
196        dstMinLogSeqNum = entry.getKey().getSequenceId();
197      }
198    } catch (EOFException e) {
199      LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
200        e);
201    }
202    if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
203      LOG.warn("Found existing old edits file. It could be the result of a previous failed"
204        + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
205        + walSplitter.walFS.getFileStatus(dst).getLen());
206      if (!walSplitter.walFS.delete(dst, false)) {
207        LOG.warn("Failed deleting of old {}", dst);
208        throw new IOException("Failed deleting of old " + dst);
209      }
210    } else {
211      LOG
212        .warn("Found existing old edits file and we have less entries. Deleting " + editsWriter.path
213          + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
214      if (!walSplitter.walFS.delete(editsWriter.path, false)) {
215        LOG.warn("Failed deleting of {}", editsWriter.path);
216        throw new IOException("Failed deleting of " + editsWriter.path);
217      }
218    }
219  }
220
221  /**
222   * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
223   * statistics about the data written to this output.
224   */
225  final class RecoveredEditsWriter {
226    /* Count of edits written to this path */
227    long editsWritten = 0;
228    /* Count of edits skipped to this path */
229    long editsSkipped = 0;
230    /* Number of nanos spent writing to this log */
231    long nanosSpent = 0;
232
233    final byte[] encodedRegionName;
234    final Path path;
235    final WALProvider.Writer writer;
236    final long minLogSeqNum;
237
238    RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer,
239      long minLogSeqNum) {
240      this.encodedRegionName = encodedRegionName;
241      this.path = path;
242      this.writer = writer;
243      this.minLogSeqNum = minLogSeqNum;
244    }
245
246    private void incrementEdits(int edits) {
247      editsWritten += edits;
248    }
249
250    private void incrementSkippedEdits(int skipped) {
251      editsSkipped += skipped;
252      totalSkippedEdits.addAndGet(skipped);
253    }
254
255    private void incrementNanoTime(long nanos) {
256      nanosSpent += nanos;
257    }
258
259    void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
260      long startTime = System.nanoTime();
261      int editsCount = 0;
262      for (WAL.Entry logEntry : entries) {
263        filterCellByStore(logEntry);
264        if (!logEntry.getEdit().isEmpty()) {
265          try {
266            writer.append(logEntry);
267          } catch (IOException e) {
268            logAndThrowWriterAppendFailure(logEntry, e);
269          }
270          updateRegionMaximumEditLogSeqNum(logEntry);
271          editsCount++;
272        } else {
273          incrementSkippedEdits(1);
274        }
275      }
276      // Pass along summary statistics
277      incrementEdits(editsCount);
278      incrementNanoTime(System.nanoTime() - startTime);
279    }
280
281    private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e)
282      throws IOException {
283      e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
284      final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log";
285      LOG.error(HBaseMarkers.FATAL, errorMsg, e);
286      updateStatusWithMsg(errorMsg);
287      throw e;
288    }
289
290    private void filterCellByStore(WAL.Entry logEntry) {
291      Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
292        .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
293      if (MapUtils.isEmpty(maxSeqIdInStores)) {
294        return;
295      }
296      // Create the array list for the cells that aren't filtered.
297      // We make the assumption that most cells will be kept.
298      ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
299      for (Cell cell : logEntry.getEdit().getCells()) {
300        if (WALEdit.isMetaEditFamily(cell)) {
301          keptCells.add(cell);
302        } else {
303          byte[] family = CellUtil.cloneFamily(cell);
304          Long maxSeqId = maxSeqIdInStores.get(family);
305          // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
306          // or the master was crashed before and we can not get the information.
307          if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
308            keptCells.add(cell);
309          }
310        }
311      }
312
313      // Anything in the keptCells array list is still live.
314      // So rather than removing the cells from the array list
315      // which would be an O(n^2) operation, we just replace the list
316      logEntry.getEdit().setCells(keptCells);
317    }
318  }
319}