001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; 021import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; 022 023import java.io.EOFException; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.CellUtil; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.log.HBaseMarkers; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; 042 043@InterfaceAudience.Private 044abstract class AbstractRecoveredEditsOutputSink extends OutputSink { 045 private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); 046 private final WALSplitter walSplitter; 047 private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>(); 048 049 public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter, 050 WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { 051 super(controller, entryBuffers, numWriters); 052 this.walSplitter = walSplitter; 053 } 054 055 /** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */ 056 protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, 057 long seqId) throws IOException { 058 Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, 059 walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), 060 walSplitter.conf); 061 if (walSplitter.walFS.exists(regionEditsPath)) { 062 LOG.warn("Found old edits file. It could be the " 063 + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" 064 + walSplitter.walFS.getFileStatus(regionEditsPath).getLen()); 065 if (!walSplitter.walFS.delete(regionEditsPath, false)) { 066 LOG.warn("Failed delete of old {}", regionEditsPath); 067 } 068 } 069 WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); 070 final String msg = "Creating recovered edits writer path=" + regionEditsPath; 071 LOG.info(msg); 072 updateStatusWithMsg(msg); 073 return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); 074 } 075 076 /** 077 * abortRecoveredEditsWriter closes the editsWriter, but does not rename and finalize the 078 * recovered edits WAL files. Please see HBASE-28569. 079 */ 080 protected void abortRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 081 List<IOException> thrown) { 082 closeRecoveredEditsWriter(editsWriter, thrown); 083 try { 084 removeRecoveredEditsFile(editsWriter); 085 } catch (IOException ioe) { 086 final String errorMsg = "Failed removing recovered edits file at " + editsWriter.path; 087 LOG.error(errorMsg); 088 updateStatusWithMsg(errorMsg); 089 } 090 } 091 092 protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter editsWriter, 093 List<IOException> thrown) throws IOException { 094 if (!closeRecoveredEditsWriter(editsWriter, thrown)) { 095 return null; 096 } 097 if (editsWriter.editsWritten == 0) { 098 // just remove the empty recovered.edits file 099 removeRecoveredEditsFile(editsWriter); 100 return null; 101 } 102 103 Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path, 104 regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName))); 105 try { 106 if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) { 107 deleteOneWithFewerEntries(editsWriter, dst); 108 } 109 // Skip the unit tests which create a splitter that reads and 110 // writes the data without touching disk. 111 // TestHLogSplit#testThreading is an example. 112 if (walSplitter.walFS.exists(editsWriter.path)) { 113 if (!walSplitter.walFS.rename(editsWriter.path, dst)) { 114 final String errorMsg = 115 "Failed renaming recovered edits " + editsWriter.path + " to " + dst; 116 updateStatusWithMsg(errorMsg); 117 throw new IOException(errorMsg); 118 } 119 final String renameEditMsg = "Rename recovered edits " + editsWriter.path + " to " + dst; 120 LOG.info(renameEditMsg); 121 updateStatusWithMsg(renameEditMsg); 122 } 123 } catch (IOException ioe) { 124 final String errorMsg = "Could not rename recovered edits " + editsWriter.path + " to " + dst; 125 LOG.error(errorMsg, ioe); 126 updateStatusWithMsg(errorMsg); 127 thrown.add(ioe); 128 return null; 129 } 130 return dst; 131 } 132 133 private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, 134 List<IOException> thrown) { 135 try { 136 editsWriter.writer.close(); 137 } catch (IOException ioe) { 138 final String errorMsg = "Could not close recovered edits at " + editsWriter.path; 139 LOG.error(errorMsg, ioe); 140 updateStatusWithMsg(errorMsg); 141 thrown.add(ioe); 142 return false; 143 } 144 final String msg = "Closed recovered edits writer path=" + editsWriter.path + " (wrote " 145 + editsWriter.editsWritten + " edits, skipped " + editsWriter.editsSkipped + " edits in " 146 + (editsWriter.nanosSpent / 1000 / 1000) + " ms)"; 147 LOG.info(msg); 148 updateStatusWithMsg(msg); 149 return true; 150 } 151 152 private void removeRecoveredEditsFile(RecoveredEditsWriter editsWriter) throws IOException { 153 if ( 154 walSplitter.walFS.exists(editsWriter.path) 155 && !walSplitter.walFS.delete(editsWriter.path, false) 156 ) { 157 final String errorMsg = "Failed deleting empty " + editsWriter.path; 158 LOG.warn(errorMsg); 159 updateStatusWithMsg(errorMsg); 160 throw new IOException("Failed deleting empty " + editsWriter.path); 161 } 162 } 163 164 @Override 165 public boolean keepRegionEvent(WAL.Entry entry) { 166 ArrayList<Cell> cells = entry.getEdit().getCells(); 167 for (Cell cell : cells) { 168 if (WALEdit.isCompactionMarker(cell)) { 169 return true; 170 } 171 } 172 return false; 173 } 174 175 /** 176 * Update region's maximum edit log SeqNum. 177 */ 178 void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { 179 synchronized (regionMaximumEditLogSeqNum) { 180 String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); 181 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); 182 if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { 183 regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); 184 } 185 } 186 } 187 188 // delete the one with fewer wal entries 189 private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst) 190 throws IOException { 191 long dstMinLogSeqNum = -1L; 192 try (WALStreamReader reader = 193 walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) { 194 WAL.Entry entry = reader.next(); 195 if (entry != null) { 196 dstMinLogSeqNum = entry.getKey().getSequenceId(); 197 } 198 } catch (EOFException e) { 199 LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, 200 e); 201 } 202 if (editsWriter.minLogSeqNum < dstMinLogSeqNum) { 203 LOG.warn("Found existing old edits file. It could be the result of a previous failed" 204 + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" 205 + walSplitter.walFS.getFileStatus(dst).getLen()); 206 if (!walSplitter.walFS.delete(dst, false)) { 207 LOG.warn("Failed deleting of old {}", dst); 208 throw new IOException("Failed deleting of old " + dst); 209 } 210 } else { 211 LOG 212 .warn("Found existing old edits file and we have less entries. Deleting " + editsWriter.path 213 + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen()); 214 if (!walSplitter.walFS.delete(editsWriter.path, false)) { 215 LOG.warn("Failed deleting of {}", editsWriter.path); 216 throw new IOException("Failed deleting of " + editsWriter.path); 217 } 218 } 219 } 220 221 /** 222 * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting 223 * statistics about the data written to this output. 224 */ 225 final class RecoveredEditsWriter { 226 /* Count of edits written to this path */ 227 long editsWritten = 0; 228 /* Count of edits skipped to this path */ 229 long editsSkipped = 0; 230 /* Number of nanos spent writing to this log */ 231 long nanosSpent = 0; 232 233 final byte[] encodedRegionName; 234 final Path path; 235 final WALProvider.Writer writer; 236 final long minLogSeqNum; 237 238 RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer, 239 long minLogSeqNum) { 240 this.encodedRegionName = encodedRegionName; 241 this.path = path; 242 this.writer = writer; 243 this.minLogSeqNum = minLogSeqNum; 244 } 245 246 private void incrementEdits(int edits) { 247 editsWritten += edits; 248 } 249 250 private void incrementSkippedEdits(int skipped) { 251 editsSkipped += skipped; 252 totalSkippedEdits.addAndGet(skipped); 253 } 254 255 private void incrementNanoTime(long nanos) { 256 nanosSpent += nanos; 257 } 258 259 void writeRegionEntries(List<WAL.Entry> entries) throws IOException { 260 long startTime = System.nanoTime(); 261 int editsCount = 0; 262 for (WAL.Entry logEntry : entries) { 263 filterCellByStore(logEntry); 264 if (!logEntry.getEdit().isEmpty()) { 265 try { 266 writer.append(logEntry); 267 } catch (IOException e) { 268 logAndThrowWriterAppendFailure(logEntry, e); 269 } 270 updateRegionMaximumEditLogSeqNum(logEntry); 271 editsCount++; 272 } else { 273 incrementSkippedEdits(1); 274 } 275 } 276 // Pass along summary statistics 277 incrementEdits(editsCount); 278 incrementNanoTime(System.nanoTime() - startTime); 279 } 280 281 private void logAndThrowWriterAppendFailure(WAL.Entry logEntry, IOException e) 282 throws IOException { 283 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; 284 final String errorMsg = "Failed to write log entry " + logEntry.toString() + " to log"; 285 LOG.error(HBaseMarkers.FATAL, errorMsg, e); 286 updateStatusWithMsg(errorMsg); 287 throw e; 288 } 289 290 private void filterCellByStore(WAL.Entry logEntry) { 291 Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() 292 .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); 293 if (MapUtils.isEmpty(maxSeqIdInStores)) { 294 return; 295 } 296 // Create the array list for the cells that aren't filtered. 297 // We make the assumption that most cells will be kept. 298 ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); 299 for (Cell cell : logEntry.getEdit().getCells()) { 300 if (WALEdit.isMetaEditFamily(cell)) { 301 keptCells.add(cell); 302 } else { 303 byte[] family = CellUtil.cloneFamily(cell); 304 Long maxSeqId = maxSeqIdInStores.get(family); 305 // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, 306 // or the master was crashed before and we can not get the information. 307 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { 308 keptCells.add(cell); 309 } 310 } 311 } 312 313 // Anything in the keptCells array list is still live. 314 // So rather than removing the cells from the array list 315 // which would be an O(n^2) operation, we just replace the list 316 logEntry.getEdit().setCells(keptCells); 317 } 318 } 319}