001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.IOException; 021import java.io.InputStream; 022import org.apache.commons.io.IOUtils; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * This class is only used by WAL ValueCompressor for decompression. 029 * <p> 030 * <strong>WARNING: </strong>The implementation is very tricky and does not follow typical 031 * InputStream pattern, so do not use it in any other places. 032 */ 033@InterfaceAudience.Private 034class WALDecompressionBoundedDelegatingInputStream extends InputStream { 035 036 private static final Logger LOG = 037 LoggerFactory.getLogger(WALDecompressionBoundedDelegatingInputStream.class); 038 039 private InputStream in; 040 041 private long pos; 042 043 private long limit; 044 045 public void reset(InputStream in, long limit) { 046 this.in = in; 047 this.limit = limit; 048 this.pos = 0; 049 } 050 051 @Override 052 public int read() throws IOException { 053 if (pos >= limit) { 054 return -1; 055 } 056 int result = in.read(); 057 if (result < 0) { 058 return -1; 059 } 060 pos++; 061 return result; 062 } 063 064 @Override 065 public int read(byte[] b, int off, int len) throws IOException { 066 if (pos >= limit) { 067 return -1; 068 } 069 int toRead = (int) Math.min(len, limit - pos); 070 int readBytes = IOUtils.read(in, b, off, toRead); 071 // increase pos by however many we actually read 072 pos += readBytes; 073 074 if (readBytes != toRead) { 075 // This is trick here, we will always try to read enough bytes to fill the buffer passed in, 076 // or we reach the end of this compression block, if there are not enough bytes, we just 077 // return -1 to let the upper layer fail with EOF 078 // In WAL value decompression this is OK as if we can not read all the data, we will finally 079 // get an EOF somewhere 080 LOG.debug("Got EOF while we want to read {} bytes from stream, but only read {}", toRead, 081 readBytes); 082 return -1; 083 } 084 return toRead; 085 } 086 087 @Override 088 public long skip(final long len) throws IOException { 089 long skipped = in.skip(Math.min(len, limit - pos)); 090 pos += skipped; 091 return skipped; 092 } 093 094 @Override 095 public int available() throws IOException { 096 if (pos >= limit) { 097 return 0; 098 } 099 // Do not call the delegate's available() method. Data in a bounded input stream is assumed 100 // available up to the limit and that is the contract we have with our callers. Regardless 101 // of what we do here, read() and skip() will behave as expected when EOF is encountered if 102 // the underlying stream is closed early or otherwise could not provide enough bytes. 103 // Note: This class is used to supply buffers to compression codecs during WAL tailing and 104 // successful decompression depends on this behavior. 105 return (int) (limit - pos); 106 } 107}