001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import org.apache.hadoop.fs.FSDataInputStream; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.hbase.io.DelegatingInputStream; 026import org.apache.hadoop.hbase.io.util.StreamUtils; 027import org.apache.hadoop.hbase.util.Pair; 028import org.apache.hadoop.hbase.wal.WAL.Entry; 029import org.apache.hadoop.hbase.wal.WALTailingReader; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; 035import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; 036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 040 041/** 042 * A WAL reader for replication. It supports reset so can be used to tail a WAL file which is being 043 * written currently. 044 */ 045@InterfaceAudience.Private 046public class ProtobufWALTailingReader extends AbstractProtobufWALReader 047 implements WALTailingReader { 048 049 private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class); 050 051 private DelegatingInputStream delegatingInput; 052 053 private static final class ReadWALKeyResult { 054 final State state; 055 final Entry entry; 056 final int followingKvCount; 057 058 public ReadWALKeyResult(State state, Entry entry, int followingKvCount) { 059 this.state = state; 060 this.entry = entry; 061 this.followingKvCount = followingKvCount; 062 } 063 } 064 065 private static final ReadWALKeyResult KEY_ERROR_AND_RESET = 066 new ReadWALKeyResult(State.ERROR_AND_RESET, null, 0); 067 068 private static final ReadWALKeyResult KEY_EOF_AND_RESET = 069 new ReadWALKeyResult(State.EOF_AND_RESET, null, 0); 070 071 private IOException unwrapIPBE(IOException e) { 072 if (e instanceof InvalidProtocolBufferException) { 073 return ((InvalidProtocolBufferException) e).unwrapIOException(); 074 } else { 075 return e; 076 } 077 } 078 079 private ReadWALKeyResult readWALKey(long originalPosition) { 080 int firstByte; 081 try { 082 firstByte = delegatingInput.read(); 083 } catch (IOException e) { 084 LOG.warn("Failed to read wal key length first byte", e); 085 return KEY_ERROR_AND_RESET; 086 } 087 if (firstByte == -1) { 088 return KEY_EOF_AND_RESET; 089 } 090 int size; 091 try { 092 size = CodedInputStream.readRawVarint32(firstByte, delegatingInput); 093 } catch (IOException e) { 094 // if we are reading a partial WALTrailer, the size will just be 0 so we will not get an 095 // exception here, so do not need to check whether it is a partial WALTrailer. 096 if ( 097 e instanceof InvalidProtocolBufferException 098 && ProtobufUtil.isEOF((InvalidProtocolBufferException) e) 099 ) { 100 LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}", 101 originalPosition, getPositionQuietly(), e.toString()); 102 return KEY_EOF_AND_RESET; 103 } else { 104 LOG.warn("Failed to read wal key length", e); 105 return KEY_ERROR_AND_RESET; 106 } 107 } 108 if (size < 0) { 109 LOG.warn("Negative pb message size read: {}, malformed WAL file?", size); 110 return KEY_ERROR_AND_RESET; 111 } 112 int available; 113 try { 114 available = delegatingInput.available(); 115 } catch (IOException e) { 116 LOG.warn("Failed to get available bytes", e); 117 return KEY_ERROR_AND_RESET; 118 } 119 if (available > 0 && available < size) { 120 LOG.info( 121 "Available stream not enough for edit, available={}, " + "entry size={} at offset={}", 122 available, size, getPositionQuietly()); 123 return KEY_EOF_AND_RESET; 124 } 125 WALProtos.WALKey walKey; 126 try { 127 if (available > 0) { 128 walKey = WALProtos.WALKey.parseFrom(ByteStreams.limit(delegatingInput, size)); 129 } else { 130 byte[] content = new byte[size]; 131 ByteStreams.readFully(delegatingInput, content); 132 walKey = WALProtos.WALKey.parseFrom(content); 133 } 134 } catch (IOException e) { 135 e = unwrapIPBE(e); 136 if ( 137 e instanceof EOFException || (e instanceof InvalidProtocolBufferException 138 && ProtobufUtil.isEOF((InvalidProtocolBufferException) e)) 139 ) { 140 LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}", 141 originalPosition, getPositionQuietly(), e.toString()); 142 return KEY_EOF_AND_RESET; 143 } else { 144 boolean isWALTrailer; 145 try { 146 isWALTrailer = isWALTrailer(originalPosition); 147 } catch (IOException ioe) { 148 LOG.warn("Error while testing whether this is a partial WAL trailer, originalPosition={}," 149 + " currentPosition={}", originalPosition, getPositionQuietly(), e); 150 return KEY_ERROR_AND_RESET; 151 } 152 if (isWALTrailer) { 153 LOG.info("Reached partial WAL Trailer(EOF) while reading WALKey, originalPosition={}," 154 + " currentPosition={}", originalPosition, getPositionQuietly(), e); 155 return KEY_EOF_AND_RESET; 156 } else { 157 // for all other type of IPBEs or IOEs, it means the WAL key is broken 158 LOG.warn("Error while reading WALKey, originalPosition={}, currentPosition={}", 159 originalPosition, getPositionQuietly(), e); 160 return KEY_ERROR_AND_RESET; 161 } 162 } 163 } 164 Entry entry = new Entry(); 165 try { 166 entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor); 167 } catch (IOException e) { 168 LOG.warn("Failed to read wal key fields from pb message", e); 169 return KEY_ERROR_AND_RESET; 170 } 171 return new ReadWALKeyResult(State.NORMAL, entry, 172 walKey.hasFollowingKvCount() ? walKey.getFollowingKvCount() : 0); 173 } 174 175 private Result editEof() { 176 return hasCompression 177 ? State.EOF_AND_RESET_COMPRESSION.getResult() 178 : State.EOF_AND_RESET.getResult(); 179 } 180 181 private Result editError() { 182 return hasCompression 183 ? State.ERROR_AND_RESET_COMPRESSION.getResult() 184 : State.ERROR_AND_RESET.getResult(); 185 } 186 187 private Result readWALEdit(Entry entry, int followingKvCount) { 188 long posBefore; 189 try { 190 posBefore = inputStream.getPos(); 191 } catch (IOException e) { 192 LOG.warn("failed to get position", e); 193 return State.ERROR_AND_RESET.getResult(); 194 } 195 if (followingKvCount == 0) { 196 LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}", 197 posBefore); 198 return new Result(State.NORMAL, entry, posBefore); 199 } 200 int actualCells; 201 try { 202 actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount); 203 } catch (Exception e) { 204 String message = " while reading " + followingKvCount + " WAL KVs; started reading at " 205 + posBefore + " and read up to " + getPositionQuietly(); 206 IOException realEofEx = extractHiddenEof(e); 207 if (realEofEx != null) { 208 LOG.warn("EOF " + message, realEofEx); 209 return editEof(); 210 } else { 211 LOG.warn("Error " + message, e); 212 return editError(); 213 } 214 } 215 if (actualCells != followingKvCount) { 216 LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}", 217 actualCells, followingKvCount, posBefore, getPositionQuietly()); 218 return editEof(); 219 } 220 long posAfter; 221 try { 222 posAfter = inputStream.getPos(); 223 } catch (IOException e) { 224 LOG.warn("failed to get position", e); 225 return editError(); 226 } 227 if (trailerPresent && posAfter > this.walEditsStopOffset) { 228 LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {}," 229 + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset); 230 return editEof(); 231 } 232 return new Result(State.NORMAL, entry, posAfter); 233 } 234 235 @Override 236 public Result next(long limit) { 237 long originalPosition; 238 try { 239 originalPosition = inputStream.getPos(); 240 } catch (IOException e) { 241 LOG.warn("failed to get position", e); 242 return State.EOF_AND_RESET.getResult(); 243 } 244 if (reachWALEditsStopOffset(originalPosition)) { 245 return State.EOF_WITH_TRAILER.getResult(); 246 } 247 if (limit < 0) { 248 // should be closed WAL file, set to no limit, i.e, just use the original inputStream 249 delegatingInput.setDelegate(inputStream); 250 } else if (limit <= originalPosition) { 251 // no data available, just return EOF 252 return State.EOF_AND_RESET.getResult(); 253 } else { 254 // calculate the remaining bytes we can read and set 255 delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition)); 256 } 257 ReadWALKeyResult readKeyResult = readWALKey(originalPosition); 258 if (readKeyResult.state != State.NORMAL) { 259 return readKeyResult.state.getResult(); 260 } 261 return readWALEdit(readKeyResult.entry, readKeyResult.followingKvCount); 262 } 263 264 private void skipHeader(FSDataInputStream stream) throws IOException { 265 stream.seek(PB_WAL_MAGIC.length); 266 int headerLength = StreamUtils.readRawVarint32(stream); 267 stream.seek(stream.getPos() + headerLength); 268 } 269 270 @Override 271 public void resetTo(long position, boolean resetCompression) throws IOException { 272 close(); 273 Pair<FSDataInputStream, FileStatus> pair = open(); 274 boolean resetSucceed = false; 275 try { 276 if (!trailerPresent) { 277 // try read trailer this time 278 readTrailer(pair.getFirst(), pair.getSecond()); 279 } 280 inputStream = pair.getFirst(); 281 delegatingInput.setDelegate(inputStream); 282 if (position < 0) { 283 // read from the beginning 284 if (compressionCtx != null) { 285 compressionCtx.clear(); 286 } 287 skipHeader(inputStream); 288 } else if (resetCompression && compressionCtx != null) { 289 // clear compressCtx and skip to the expected position, to fill up the dictionary 290 compressionCtx.clear(); 291 skipHeader(inputStream); 292 if (position != inputStream.getPos()) { 293 skipTo(position); 294 } 295 } else { 296 // just seek to the expected position 297 inputStream.seek(position); 298 } 299 resetSucceed = true; 300 } finally { 301 if (!resetSucceed) { 302 // close the input stream to avoid resource leak 303 close(); 304 } 305 } 306 } 307 308 @Override 309 protected InputStream getCellCodecInputStream(FSDataInputStream stream) { 310 delegatingInput = new DelegatingInputStream(stream); 311 return delegatingInput; 312 } 313 314 @Override 315 protected void skipTo(long position) throws IOException { 316 for (;;) { 317 Result result = next(-1); 318 if (result.getState() != State.NORMAL) { 319 throw new IOException("Can not skip to the given position " + position + ", stopped at " 320 + result.getEntryEndPos() + " which is still before the give position"); 321 } 322 if (result.getEntryEndPos() == position) { 323 return; 324 } 325 if (result.getEntryEndPos() > position) { 326 throw new IOException("Can not skip to the given position " + position + ", stopped at " 327 + result.getEntryEndPos() + " which is already beyond the give position, malformed WAL?"); 328 } 329 } 330 } 331}