001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.io.ByteArrayOutputStream; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.Cell; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.KeyValueUtil; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.codec.BaseDecoder; 031import org.apache.hadoop.hbase.codec.BaseEncoder; 032import org.apache.hadoop.hbase.codec.Codec; 033import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 034import org.apache.hadoop.hbase.io.ByteBuffInputStream; 035import org.apache.hadoop.hbase.io.ByteBufferWriter; 036import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream; 037import org.apache.hadoop.hbase.io.util.Dictionary; 038import org.apache.hadoop.hbase.io.util.StreamUtils; 039import org.apache.hadoop.hbase.nio.ByteBuff; 040import org.apache.hadoop.hbase.util.ByteBufferUtils; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.ReflectionUtils; 043import org.apache.hadoop.io.IOUtils; 044import org.apache.yetus.audience.InterfaceAudience; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 047import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 048 049/** 050 * Compression in this class is lifted off Compressor/KeyValueCompression. This is a pure 051 * coincidence... they are independent and don't have to be compatible. This codec is used at server 052 * side for writing cells to WAL as well as for sending edits as part of the distributed splitting 053 * process. 054 */ 055@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX, 056 HBaseInterfaceAudience.CONFIG }) 057public class WALCellCodec implements Codec { 058 /** Configuration key for the class to use when encoding cells in the WAL */ 059 public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; 060 061 protected final CompressionContext compression; 062 063 /** 064 * <b>All subclasses must implement a no argument constructor</b> 065 */ 066 public WALCellCodec() { 067 this.compression = null; 068 } 069 070 /** 071 * Default constructor - <b>all subclasses must implement a constructor with this signature </b> 072 * if they are to be dynamically loaded from the {@link Configuration}. 073 * @param conf configuration to configure <tt>this</tt> 074 * @param compression compression the codec should support, can be <tt>null</tt> to indicate no 075 * compression 076 */ 077 public WALCellCodec(Configuration conf, CompressionContext compression) { 078 this.compression = compression; 079 } 080 081 public static Class<?> getWALCellCodecClass(Configuration conf) { 082 return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class); 083 } 084 085 /** 086 * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and 087 * CompressionContext, if {@code cellCodecClsName} is specified. Otherwise Cell Codec classname is 088 * read from {@link Configuration}. Fully prepares the codec for use. 089 * @param conf {@link Configuration} to read for the user-specified codec. If none is 090 * specified, uses a {@link WALCellCodec}. 091 * @param cellCodecClsName name of codec 092 * @param compression compression the codec should use 093 * @return a {@link WALCellCodec} ready for use. 094 * @throws UnsupportedOperationException if the codec cannot be instantiated 095 */ 096 097 public static WALCellCodec create(Configuration conf, String cellCodecClsName, 098 CompressionContext compression) throws UnsupportedOperationException { 099 if (cellCodecClsName == null) { 100 cellCodecClsName = getWALCellCodecClass(conf).getName(); 101 } 102 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, 103 new Class[] { Configuration.class, CompressionContext.class }, 104 new Object[] { conf, compression }); 105 } 106 107 /** 108 * Create and setup a {@link WALCellCodec} from the CompressionContext. Cell Codec classname is 109 * read from {@link Configuration}. Fully prepares the codec for use. 110 * @param conf {@link Configuration} to read for the user-specified codec. If none is 111 * specified, uses a {@link WALCellCodec}. 112 * @param compression compression the codec should use 113 * @return a {@link WALCellCodec} ready for use. 114 * @throws UnsupportedOperationException if the codec cannot be instantiated 115 */ 116 public static WALCellCodec create(Configuration conf, CompressionContext compression) 117 throws UnsupportedOperationException { 118 String cellCodecClsName = getWALCellCodecClass(conf).getName(); 119 return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, 120 new Class[] { Configuration.class, CompressionContext.class }, 121 new Object[] { conf, compression }); 122 } 123 124 public interface ByteStringCompressor { 125 ByteString compress(byte[] data, Enum dictIndex) throws IOException; 126 } 127 128 public interface ByteStringUncompressor { 129 byte[] uncompress(ByteString data, Enum dictIndex) throws IOException; 130 } 131 132 static class StatelessUncompressor implements ByteStringUncompressor { 133 CompressionContext compressionContext; 134 135 public StatelessUncompressor(CompressionContext compressionContext) { 136 this.compressionContext = compressionContext; 137 } 138 139 @Override 140 public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException { 141 return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex)); 142 } 143 } 144 145 static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor { 146 private CompressionContext compressionContext; 147 148 public BaosAndCompressor(CompressionContext compressionContext) { 149 this.compressionContext = compressionContext; 150 } 151 152 public ByteString toByteString() { 153 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 154 // them. 155 return ByteString.copyFrom(this.buf, 0, this.count); 156 } 157 158 @Override 159 public ByteString compress(byte[] data, Enum dictIndex) throws IOException { 160 writeCompressed(data, dictIndex); 161 // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse 162 // them. 163 ByteString result = ByteString.copyFrom(this.buf, 0, this.count); 164 reset(); // Only resets the count - we reuse the byte array. 165 return result; 166 } 167 168 private void writeCompressed(byte[] data, Enum dictIndex) throws IOException { 169 Dictionary dict = compressionContext.getDictionary(dictIndex); 170 assert dict != null; 171 short dictIdx = dict.findEntry(data, 0, data.length); 172 if (dictIdx == Dictionary.NOT_IN_DICTIONARY) { 173 write(Dictionary.NOT_IN_DICTIONARY); 174 StreamUtils.writeRawVInt32(this, data.length); 175 write(data, 0, data.length); 176 } else { 177 StreamUtils.writeShort(this, dictIdx); 178 } 179 } 180 } 181 182 static class NoneCompressor implements ByteStringCompressor { 183 184 @Override 185 public ByteString compress(byte[] data, Enum dictIndex) { 186 return UnsafeByteOperations.unsafeWrap(data); 187 } 188 } 189 190 static class NoneUncompressor implements ByteStringUncompressor { 191 192 @Override 193 public byte[] uncompress(ByteString data, Enum dictIndex) { 194 return data.toByteArray(); 195 } 196 } 197 198 private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException { 199 InputStream in = bs.newInput(); 200 byte status = StreamUtils.readByte(in); 201 if (status == Dictionary.NOT_IN_DICTIONARY) { 202 byte[] arr = new byte[StreamUtils.readRawVarint32(in)]; 203 int bytesRead = in.read(arr); 204 if (bytesRead != arr.length) { 205 throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead); 206 } 207 if (dict != null) { 208 dict.addEntry(arr, 0, arr.length); 209 } 210 return arr; 211 } else { 212 // Status here is the higher-order byte of index of the dictionary entry. 213 short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in)); 214 byte[] entry = dict.getEntry(dictIdx); 215 if (entry == null) { 216 throw new IOException("Missing dictionary entry for index " + dictIdx); 217 } 218 return entry; 219 } 220 } 221 222 static class CompressedKvEncoder extends BaseEncoder { 223 private final CompressionContext compression; 224 private final boolean hasValueCompression; 225 private final boolean hasTagCompression; 226 227 public CompressedKvEncoder(OutputStream out, CompressionContext compression) { 228 super(out); 229 this.compression = compression; 230 this.hasValueCompression = compression.hasValueCompression(); 231 this.hasTagCompression = compression.hasTagCompression(); 232 } 233 234 @Override 235 public void write(Cell cell) throws IOException { 236 // We first write the KeyValue infrastructure as VInts. 237 StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell)); 238 StreamUtils.writeRawVInt32(out, cell.getValueLength()); 239 // To support tags 240 int tagsLength = cell.getTagsLength(); 241 StreamUtils.writeRawVInt32(out, tagsLength); 242 PrivateCellUtil.compressRow(out, cell, 243 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 244 PrivateCellUtil.compressFamily(out, cell, 245 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 246 PrivateCellUtil.compressQualifier(out, cell, 247 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 248 // Write timestamp, type and value. 249 StreamUtils.writeLong(out, cell.getTimestamp()); 250 out.write(cell.getTypeByte()); 251 if (hasValueCompression) { 252 writeCompressedValue(out, cell); 253 } else { 254 PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); 255 } 256 if (tagsLength > 0) { 257 if (hasTagCompression) { 258 // Write tags using Dictionary compression 259 PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); 260 } else { 261 // Tag compression is disabled within the WAL compression. Just write the tags bytes as 262 // it is. 263 PrivateCellUtil.writeTags(out, cell, tagsLength); 264 } 265 } 266 } 267 268 private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { 269 byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(), 270 cell.getValueOffset(), cell.getValueLength()); 271 StreamUtils.writeRawVInt32(out, compressed.length); 272 out.write(compressed); 273 } 274 275 } 276 277 static class CompressedKvDecoder extends BaseDecoder { 278 private final CompressionContext compression; 279 private final boolean hasValueCompression; 280 private final boolean hasTagCompression; 281 282 public CompressedKvDecoder(InputStream in, CompressionContext compression) { 283 super(in); 284 this.compression = compression; 285 this.hasValueCompression = compression.hasValueCompression(); 286 this.hasTagCompression = compression.hasTagCompression(); 287 } 288 289 @Override 290 protected Cell parseCell() throws IOException { 291 int keylength = StreamUtils.readRawVarint32(in); 292 int vlength = StreamUtils.readRawVarint32(in); 293 int tagsLength = StreamUtils.readRawVarint32(in); 294 int length = 0; 295 if (tagsLength == 0) { 296 length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; 297 } else { 298 length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength; 299 } 300 301 byte[] backingArray = new byte[length]; 302 int pos = 0; 303 pos = Bytes.putInt(backingArray, pos, keylength); 304 pos = Bytes.putInt(backingArray, pos, vlength); 305 306 // the row 307 int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, 308 compression.getDictionary(CompressionContext.DictionaryIndex.ROW)); 309 checkLength(elemLen, Short.MAX_VALUE); 310 pos = Bytes.putShort(backingArray, pos, (short) elemLen); 311 pos += elemLen; 312 313 // family 314 elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, 315 compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); 316 checkLength(elemLen, Byte.MAX_VALUE); 317 pos = Bytes.putByte(backingArray, pos, (byte) elemLen); 318 pos += elemLen; 319 320 // qualifier 321 elemLen = readIntoArray(backingArray, pos, 322 compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); 323 pos += elemLen; 324 325 // timestamp 326 long ts = StreamUtils.readLong(in); 327 pos = Bytes.putLong(backingArray, pos, ts); 328 // type and value 329 int typeValLen = length - pos; 330 if (tagsLength > 0) { 331 typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; 332 } 333 pos = Bytes.putByte(backingArray, pos, (byte) in.read()); 334 int valLen = typeValLen - 1; 335 if (hasValueCompression) { 336 readCompressedValue(in, backingArray, pos, valLen); 337 pos += valLen; 338 } else { 339 IOUtils.readFully(in, backingArray, pos, valLen); 340 pos += valLen; 341 } 342 // tags 343 if (tagsLength > 0) { 344 pos = Bytes.putAsShort(backingArray, pos, tagsLength); 345 if (hasTagCompression) { 346 compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); 347 } else { 348 IOUtils.readFully(in, backingArray, pos, tagsLength); 349 } 350 } 351 return new KeyValue(backingArray, 0, length); 352 } 353 354 private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException { 355 byte status = StreamUtils.readByte(in); 356 if (status == Dictionary.NOT_IN_DICTIONARY) { 357 // status byte indicating that data to be read is not in dictionary. 358 // if this isn't in the dictionary, we need to add to the dictionary. 359 int length = StreamUtils.readRawVarint32(in); 360 IOUtils.readFully(in, to, offset, length); 361 dict.addEntry(to, offset, length); 362 return length; 363 } else { 364 // the status byte also acts as the higher order byte of the dictionary entry. 365 short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in)); 366 byte[] entry = dict.getEntry(dictIdx); 367 if (entry == null) { 368 throw new IOException("Missing dictionary entry for index " + dictIdx); 369 } 370 // now we write the uncompressed value. 371 Bytes.putBytes(to, offset, entry, 0, entry.length); 372 return entry.length; 373 } 374 } 375 376 private static void checkLength(int len, int max) throws IOException { 377 if (len < 0 || len > max) { 378 throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); 379 } 380 } 381 382 private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, 383 int expectedLength) throws IOException { 384 int compressedLen = StreamUtils.readRawVarint32(in); 385 compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, 386 expectedLength); 387 } 388 } 389 390 public static class EnsureKvEncoder extends BaseEncoder { 391 public EnsureKvEncoder(OutputStream out) { 392 super(out); 393 } 394 395 @Override 396 public void write(Cell cell) throws IOException { 397 checkFlushed(); 398 // Make sure to write tags into WAL 399 ByteBufferUtils.putInt(this.out, KeyValueUtil.getSerializedSize(cell, true)); 400 KeyValueUtil.oswrite(cell, this.out, true); 401 } 402 } 403 404 @Override 405 public Decoder getDecoder(InputStream is) { 406 return (compression == null) 407 ? new KeyValueCodecWithTags.KeyValueDecoder(is) 408 : new CompressedKvDecoder(is, compression); 409 } 410 411 @Override 412 public Decoder getDecoder(ByteBuff buf) { 413 return getDecoder(new ByteBuffInputStream(buf)); 414 } 415 416 @Override 417 public Encoder getEncoder(OutputStream os) { 418 os = (os instanceof ByteBufferWriter) ? os : new ByteBufferWriterOutputStream(os); 419 if (compression == null) { 420 return new EnsureKvEncoder(os); 421 } 422 return new CompressedKvEncoder(os, compression); 423 } 424 425 public ByteStringCompressor getByteStringCompressor() { 426 return new BaosAndCompressor(compression); 427 } 428 429 public ByteStringUncompressor getByteStringUncompressor() { 430 return new StatelessUncompressor(compression); 431 } 432 433 public static ByteStringCompressor getNoneCompressor() { 434 return new NoneCompressor(); 435 } 436 437 public static ByteStringUncompressor getNoneUncompressor() { 438 return new NoneUncompressor(); 439 } 440}