001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.compress.xerial; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.io.compress.CanReinit; 024import org.apache.hadoop.hbase.io.compress.CompressionUtil; 025import org.apache.hadoop.io.compress.Compressor; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.xerial.snappy.Snappy; 028 029/** 030 * Hadoop compressor glue for Xerial Snappy. 031 */ 032@InterfaceAudience.Private 033public class SnappyCompressor implements CanReinit, Compressor { 034 035 protected ByteBuffer inBuf, outBuf; 036 protected int bufferSize; 037 protected boolean finish, finished; 038 protected long bytesRead, bytesWritten; 039 040 SnappyCompressor(int bufferSize) { 041 this.bufferSize = bufferSize; 042 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 043 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 044 this.outBuf.position(bufferSize); 045 } 046 047 @Override 048 public int compress(byte[] b, int off, int len) throws IOException { 049 // If we have previously compressed our input and still have some buffered bytes 050 // remaining, provide them to the caller. 051 if (outBuf.hasRemaining()) { 052 int remaining = outBuf.remaining(), n = Math.min(remaining, len); 053 outBuf.get(b, off, n); 054 return n; 055 } 056 // We don't actually begin compression until our caller calls finish(). 057 if (finish) { 058 if (inBuf.position() > 0) { 059 inBuf.flip(); 060 int uncompressed = inBuf.remaining(); 061 // If we don't have enough capacity in our currently allocated output buffer, 062 // allocate a new one which does. 063 int needed = maxCompressedLength(uncompressed); 064 if (outBuf.capacity() < needed) { 065 needed = CompressionUtil.roundInt2(needed); 066 outBuf = ByteBuffer.allocateDirect(needed); 067 } else { 068 outBuf.clear(); 069 } 070 int written = Snappy.compress(inBuf, outBuf); 071 bytesWritten += written; 072 inBuf.clear(); 073 finished = true; 074 int n = Math.min(written, len); 075 outBuf.get(b, off, n); 076 return n; 077 } else { 078 finished = true; 079 } 080 } 081 return 0; 082 } 083 084 @Override 085 public void end() { 086 } 087 088 @Override 089 public void finish() { 090 finish = true; 091 } 092 093 @Override 094 public boolean finished() { 095 return finished && !outBuf.hasRemaining(); 096 } 097 098 @Override 099 public long getBytesRead() { 100 return bytesRead; 101 } 102 103 @Override 104 public long getBytesWritten() { 105 return bytesWritten; 106 } 107 108 @Override 109 public boolean needsInput() { 110 return !finished(); 111 } 112 113 @Override 114 public void reinit(Configuration conf) { 115 if (conf != null) { 116 // Buffer size might have changed 117 int newBufferSize = SnappyCodec.getBufferSize(conf); 118 if (bufferSize != newBufferSize) { 119 bufferSize = newBufferSize; 120 this.inBuf = ByteBuffer.allocateDirect(bufferSize); 121 this.outBuf = ByteBuffer.allocateDirect(bufferSize); 122 } 123 } 124 reset(); 125 } 126 127 @Override 128 public void reset() { 129 inBuf.clear(); 130 outBuf.clear(); 131 outBuf.position(outBuf.capacity()); 132 bytesRead = 0; 133 bytesWritten = 0; 134 finish = false; 135 finished = false; 136 } 137 138 @Override 139 public void setDictionary(byte[] b, int off, int len) { 140 throw new UnsupportedOperationException("setDictionary is not supported"); 141 } 142 143 @Override 144 public void setInput(byte[] b, int off, int len) { 145 if (inBuf.remaining() < len) { 146 // Get a new buffer that can accomodate the accumulated input plus the additional 147 // input that would cause a buffer overflow without reallocation. 148 // This condition should be fortunately rare, because it is expensive. 149 int needed = CompressionUtil.roundInt2(inBuf.capacity() + len); 150 ByteBuffer newBuf = ByteBuffer.allocateDirect(needed); 151 inBuf.flip(); 152 newBuf.put(inBuf); 153 inBuf = newBuf; 154 } 155 inBuf.put(b, off, len); 156 bytesRead += len; 157 finished = false; 158 } 159 160 // Package private 161 162 int maxCompressedLength(int len) { 163 return Snappy.maxCompressedLength(len); 164 } 165 166}