001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.EOFException; 023import java.io.FileNotFoundException; 024import java.io.IOException; 025import java.time.Instant; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.RemoteIterator; 035import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; 036import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 037import org.apache.hadoop.hbase.wal.WAL; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.apache.hadoop.hbase.wal.WALFactory; 041import org.apache.hadoop.hbase.wal.WALKey; 042import org.apache.hadoop.hbase.wal.WALStreamReader; 043import org.apache.hadoop.io.Writable; 044import org.apache.hadoop.mapreduce.InputFormat; 045import org.apache.hadoop.mapreduce.InputSplit; 046import org.apache.hadoop.mapreduce.JobContext; 047import org.apache.hadoop.mapreduce.RecordReader; 048import org.apache.hadoop.mapreduce.TaskAttemptContext; 049import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 050import org.apache.hadoop.mapreduce.security.TokenCache; 051import org.apache.hadoop.util.StringUtils; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. 058 */ 059@InterfaceAudience.Public 060public class WALInputFormat extends InputFormat<WALKey, WALEdit> { 061 private static final Logger LOG = LoggerFactory.getLogger(WALInputFormat.class); 062 063 public static final String START_TIME_KEY = "wal.start.time"; 064 public static final String END_TIME_KEY = "wal.end.time"; 065 066 /** 067 * {@link InputSplit} for {@link WAL} files. Each split represent exactly one log file. 068 */ 069 static class WALSplit extends InputSplit implements Writable { 070 private String logFileName; 071 private long fileSize; 072 private long startTime; 073 private long endTime; 074 075 /** for serialization */ 076 public WALSplit() { 077 } 078 079 /** 080 * Represent an WALSplit, i.e. a single WAL file. Start- and EndTime are managed by the split, 081 * so that WAL files can be filtered before WALEdits are passed to the mapper(s). 082 */ 083 public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { 084 this.logFileName = logFileName; 085 this.fileSize = fileSize; 086 this.startTime = startTime; 087 this.endTime = endTime; 088 } 089 090 @Override 091 public long getLength() throws IOException, InterruptedException { 092 return fileSize; 093 } 094 095 @Override 096 public String[] getLocations() throws IOException, InterruptedException { 097 // TODO: Find the data node with the most blocks for this WAL? 098 return new String[] {}; 099 } 100 101 public String getLogFileName() { 102 return logFileName; 103 } 104 105 public long getStartTime() { 106 return startTime; 107 } 108 109 public long getEndTime() { 110 return endTime; 111 } 112 113 @Override 114 public void readFields(DataInput in) throws IOException { 115 logFileName = in.readUTF(); 116 fileSize = in.readLong(); 117 startTime = in.readLong(); 118 endTime = in.readLong(); 119 } 120 121 @Override 122 public void write(DataOutput out) throws IOException { 123 out.writeUTF(logFileName); 124 out.writeLong(fileSize); 125 out.writeLong(startTime); 126 out.writeLong(endTime); 127 } 128 129 @Override 130 public String toString() { 131 return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; 132 } 133 } 134 135 /** 136 * {@link RecordReader} for an {@link WAL} file. Implementation shared with deprecated 137 * HLogInputFormat. 138 */ 139 static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> { 140 private WALStreamReader reader = null; 141 // visible until we can remove the deprecated HLogInputFormat 142 Entry currentEntry = new Entry(); 143 private long startTime; 144 private long endTime; 145 private Configuration conf; 146 private Path logFile; 147 private long currentPos; 148 149 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION", 150 justification = "HDFS-4380") 151 private WALStreamReader openReader(Path path, long startPosition) throws IOException { 152 long retryInterval = 2000; // 2 sec 153 int maxAttempts = 30; 154 int attempt = 0; 155 Exception ee = null; 156 WALStreamReader reader = null; 157 while (reader == null && attempt++ < maxAttempts) { 158 try { 159 // Detect if this is a new file, if so get a new reader else 160 // reset the current reader so that we see the new data 161 reader = 162 WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition); 163 return reader; 164 } catch (LeaseNotRecoveredException lnre) { 165 // HBASE-15019 the WAL was not closed due to some hiccup. 166 LOG.warn("Try to recover the WAL lease " + path, lnre); 167 AbstractFSWALProvider.recoverLease(conf, path); 168 reader = null; 169 ee = lnre; 170 } catch (NullPointerException npe) { 171 // Workaround for race condition in HDFS-4380 172 // which throws a NPE if we open a file before any data node has the most recent block 173 // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. 174 LOG.warn("Got NPE opening reader, will retry."); 175 reader = null; 176 ee = npe; 177 } 178 if (reader == null) { 179 // sleep before next attempt 180 try { 181 Thread.sleep(retryInterval); 182 } catch (InterruptedException e) { 183 Thread.currentThread().interrupt(); 184 } 185 } 186 } 187 throw new IOException("Could not open reader", ee); 188 } 189 190 @Override 191 public void initialize(InputSplit split, TaskAttemptContext context) 192 throws IOException, InterruptedException { 193 WALSplit hsplit = (WALSplit) split; 194 logFile = new Path(hsplit.getLogFileName()); 195 conf = context.getConfiguration(); 196 LOG.info("Opening {} for {}", logFile, split); 197 openReader(logFile); 198 this.startTime = hsplit.getStartTime(); 199 this.endTime = hsplit.getEndTime(); 200 } 201 202 private void openReader(Path path) throws IOException { 203 closeReader(); 204 reader = openReader(path, currentPos > 0 ? currentPos : -1); 205 setCurrentPath(path); 206 } 207 208 private void setCurrentPath(Path path) { 209 this.logFile = path; 210 } 211 212 private void closeReader() throws IOException { 213 if (reader != null) { 214 reader.close(); 215 reader = null; 216 } 217 } 218 219 @Override 220 public boolean nextKeyValue() throws IOException, InterruptedException { 221 if (reader == null) { 222 return false; 223 } 224 this.currentPos = reader.getPosition(); 225 Entry temp; 226 long i = -1; 227 try { 228 do { 229 // skip older entries 230 try { 231 temp = reader.next(currentEntry); 232 i++; 233 } catch (EOFException x) { 234 LOG.warn("Corrupted entry detected. Ignoring the rest of the file." 235 + " (This is normal when a RegionServer crashed.)"); 236 return false; 237 } 238 } while (temp != null && temp.getKey().getWriteTime() < startTime); 239 240 if (temp == null) { 241 if (i > 0) { 242 LOG.info("Skipped " + i + " entries."); 243 } 244 LOG.info("Reached end of file."); 245 return false; 246 } else if (i > 0) { 247 LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); 248 } 249 boolean res = temp.getKey().getWriteTime() <= endTime; 250 if (!res) { 251 LOG.info( 252 "Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file."); 253 } 254 return res; 255 } catch (IOException e) { 256 Path archivedLog = AbstractFSWALProvider.findArchivedLog(logFile, conf); 257 // archivedLog can be null if unable to locate in archiveDir. 258 if (archivedLog != null) { 259 openReader(archivedLog); 260 // Try call again in recursion 261 return nextKeyValue(); 262 } else { 263 throw e; 264 } 265 } 266 } 267 268 @Override 269 public WALEdit getCurrentValue() throws IOException, InterruptedException { 270 return currentEntry.getEdit(); 271 } 272 273 @Override 274 public float getProgress() throws IOException, InterruptedException { 275 // N/A depends on total number of entries, which is unknown 276 return 0; 277 } 278 279 @Override 280 public void close() throws IOException { 281 LOG.info("Closing reader"); 282 if (reader != null) { 283 this.reader.close(); 284 } 285 } 286 } 287 288 /** 289 * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer need to 290 * support HLogInputFormat. 291 */ 292 static class WALKeyRecordReader extends WALRecordReader<WALKey> { 293 @Override 294 public WALKey getCurrentKey() throws IOException, InterruptedException { 295 return currentEntry.getKey(); 296 } 297 } 298 299 @Override 300 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 301 return getSplits(context, START_TIME_KEY, END_TIME_KEY); 302 } 303 304 /** 305 * implementation shared with deprecated HLogInputFormat 306 */ 307 List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) 308 throws IOException, InterruptedException { 309 Configuration conf = context.getConfiguration(); 310 boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); 311 Path[] inputPaths = getInputPaths(conf); 312 // get delegation token for the filesystem 313 TokenCache.obtainTokensForNamenodes(context.getCredentials(), inputPaths, conf); 314 long startTime = conf.getLong(startKey, Long.MIN_VALUE); 315 long endTime = conf.getLong(endKey, Long.MAX_VALUE); 316 317 List<FileStatus> allFiles = new ArrayList<FileStatus>(); 318 for (Path inputPath : inputPaths) { 319 FileSystem fs = inputPath.getFileSystem(conf); 320 try { 321 List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); 322 allFiles.addAll(files); 323 } catch (FileNotFoundException e) { 324 if (ignoreMissing) { 325 LOG.warn("File " + inputPath + " is missing. Skipping it."); 326 continue; 327 } 328 throw e; 329 } 330 } 331 List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); 332 for (FileStatus file : allFiles) { 333 splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); 334 } 335 return splits; 336 } 337 338 private Path[] getInputPaths(Configuration conf) { 339 String inpDirs = conf.get(FileInputFormat.INPUT_DIR); 340 return StringUtils 341 .stringToPath(inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); 342 } 343 344 /** 345 * @param startTime If file looks like it has a timestamp in its name, we'll check if newer or 346 * equal to this value else we will filter out the file. If name does not seem to 347 * have a timestamp, we will just return it w/o filtering. 348 * @param endTime If file looks like it has a timestamp in its name, we'll check if older or 349 * equal to this value else we will filter out the file. If name does not seem to 350 * have a timestamp, we will just return it w/o filtering. 351 */ 352 private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) 353 throws IOException { 354 List<FileStatus> result = new ArrayList<>(); 355 LOG.debug("Scanning " + dir.toString() + " for WAL files"); 356 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); 357 if (!iter.hasNext()) { 358 return Collections.emptyList(); 359 } 360 while (iter.hasNext()) { 361 LocatedFileStatus file = iter.next(); 362 if (file.isDirectory()) { 363 // Recurse into sub directories 364 result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); 365 } else { 366 addFile(result, file, startTime, endTime); 367 } 368 } 369 // TODO: These results should be sorted? Results could be content of recovered.edits directory 370 // -- null padded increasing numeric -- or a WAL file w/ timestamp suffix or timestamp and 371 // then meta suffix. See AbstractFSWALProvider#WALStartTimeComparator 372 return result; 373 } 374 375 static void addFile(List<FileStatus> result, LocatedFileStatus lfs, long startTime, 376 long endTime) { 377 long timestamp = WAL.getTimestamp(lfs.getPath().getName()); 378 if (timestamp > 0) { 379 // Looks like a valid timestamp. 380 if (timestamp <= endTime && timestamp >= startTime) { 381 LOG.info("Found {}", lfs.getPath()); 382 result.add(lfs); 383 } else { 384 LOG.info("Skipped {}, outside range [{}/{} - {}/{}]", lfs.getPath(), startTime, 385 Instant.ofEpochMilli(startTime), endTime, Instant.ofEpochMilli(endTime)); 386 } 387 } else { 388 // If no timestamp, add it regardless. 389 LOG.info("Found (no-timestamp!) {}", lfs); 390 result.add(lfs); 391 } 392 } 393 394 @Override 395 public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, 396 TaskAttemptContext context) throws IOException, InterruptedException { 397 return new WALKeyRecordReader(); 398 } 399}