001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.wal;
019
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.ExtendedCell;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.KeyValue;
029import org.apache.hadoop.hbase.KeyValueUtil;
030import org.apache.hadoop.hbase.PrivateCellUtil;
031import org.apache.hadoop.hbase.codec.BaseDecoder;
032import org.apache.hadoop.hbase.codec.BaseEncoder;
033import org.apache.hadoop.hbase.codec.Codec;
034import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
035import org.apache.hadoop.hbase.io.ByteBuffInputStream;
036import org.apache.hadoop.hbase.io.ByteBufferWriter;
037import org.apache.hadoop.hbase.io.ByteBufferWriterOutputStream;
038import org.apache.hadoop.hbase.io.util.Dictionary;
039import org.apache.hadoop.hbase.io.util.StreamUtils;
040import org.apache.hadoop.hbase.nio.ByteBuff;
041import org.apache.hadoop.hbase.util.ByteBufferUtils;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.ReflectionUtils;
044import org.apache.hadoop.io.IOUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
048import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
049
050/**
051 * Compression in this class is lifted off Compressor/KeyValueCompression. This is a pure
052 * coincidence... they are independent and don't have to be compatible. This codec is used at server
053 * side for writing cells to WAL as well as for sending edits as part of the distributed splitting
054 * process.
055 */
056@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
057  HBaseInterfaceAudience.CONFIG })
058public class WALCellCodec implements Codec {
059  /** Configuration key for the class to use when encoding cells in the WAL */
060  public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
061
062  protected final CompressionContext compression;
063
064  /**
065   * <b>All subclasses must implement a no argument constructor</b>
066   */
067  public WALCellCodec() {
068    this.compression = null;
069  }
070
071  /**
072   * Default constructor - <b>all subclasses must implement a constructor with this signature </b>
073   * if they are to be dynamically loaded from the {@link Configuration}.
074   * @param conf        configuration to configure <tt>this</tt>
075   * @param compression compression the codec should support, can be <tt>null</tt> to indicate no
076   *                    compression
077   */
078  public WALCellCodec(Configuration conf, CompressionContext compression) {
079    this.compression = compression;
080  }
081
082  public static Class<?> getWALCellCodecClass(Configuration conf) {
083    return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
084  }
085
086  /**
087   * Create and setup a {@link WALCellCodec} from the {@code cellCodecClsName} and
088   * CompressionContext, if {@code cellCodecClsName} is specified. Otherwise Cell Codec classname is
089   * read from {@link Configuration}. Fully prepares the codec for use.
090   * @param conf             {@link Configuration} to read for the user-specified codec. If none is
091   *                         specified, uses a {@link WALCellCodec}.
092   * @param cellCodecClsName name of codec
093   * @param compression      compression the codec should use
094   * @return a {@link WALCellCodec} ready for use.
095   * @throws UnsupportedOperationException if the codec cannot be instantiated
096   */
097
098  public static WALCellCodec create(Configuration conf, String cellCodecClsName,
099    CompressionContext compression) throws UnsupportedOperationException {
100    if (cellCodecClsName == null) {
101      cellCodecClsName = getWALCellCodecClass(conf).getName();
102    }
103    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName,
104      new Class[] { Configuration.class, CompressionContext.class },
105      new Object[] { conf, compression });
106  }
107
108  /**
109   * Create and setup a {@link WALCellCodec} from the CompressionContext. Cell Codec classname is
110   * read from {@link Configuration}. Fully prepares the codec for use.
111   * @param conf        {@link Configuration} to read for the user-specified codec. If none is
112   *                    specified, uses a {@link WALCellCodec}.
113   * @param compression compression the codec should use
114   * @return a {@link WALCellCodec} ready for use.
115   * @throws UnsupportedOperationException if the codec cannot be instantiated
116   */
117  public static WALCellCodec create(Configuration conf, CompressionContext compression)
118    throws UnsupportedOperationException {
119    String cellCodecClsName = getWALCellCodecClass(conf).getName();
120    return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName,
121      new Class[] { Configuration.class, CompressionContext.class },
122      new Object[] { conf, compression });
123  }
124
125  public interface ByteStringCompressor {
126    ByteString compress(byte[] data, Enum dictIndex) throws IOException;
127  }
128
129  public interface ByteStringUncompressor {
130    byte[] uncompress(ByteString data, Enum dictIndex) throws IOException;
131  }
132
133  static class StatelessUncompressor implements ByteStringUncompressor {
134    CompressionContext compressionContext;
135
136    public StatelessUncompressor(CompressionContext compressionContext) {
137      this.compressionContext = compressionContext;
138    }
139
140    @Override
141    public byte[] uncompress(ByteString data, Enum dictIndex) throws IOException {
142      return WALCellCodec.uncompressByteString(data, compressionContext.getDictionary(dictIndex));
143    }
144  }
145
146  static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
147    private CompressionContext compressionContext;
148
149    public BaosAndCompressor(CompressionContext compressionContext) {
150      this.compressionContext = compressionContext;
151    }
152
153    public ByteString toByteString() {
154      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
155      // them.
156      return ByteString.copyFrom(this.buf, 0, this.count);
157    }
158
159    @Override
160    public ByteString compress(byte[] data, Enum dictIndex) throws IOException {
161      writeCompressed(data, dictIndex);
162      // We need this copy to create the ByteString as the byte[] 'buf' is not immutable. We reuse
163      // them.
164      ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
165      reset(); // Only resets the count - we reuse the byte array.
166      return result;
167    }
168
169    private void writeCompressed(byte[] data, Enum dictIndex) throws IOException {
170      Dictionary dict = compressionContext.getDictionary(dictIndex);
171      assert dict != null;
172      short dictIdx = dict.findEntry(data, 0, data.length);
173      if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
174        write(Dictionary.NOT_IN_DICTIONARY);
175        StreamUtils.writeRawVInt32(this, data.length);
176        write(data, 0, data.length);
177      } else {
178        StreamUtils.writeShort(this, dictIdx);
179      }
180    }
181  }
182
183  static class NoneCompressor implements ByteStringCompressor {
184
185    @Override
186    public ByteString compress(byte[] data, Enum dictIndex) {
187      return UnsafeByteOperations.unsafeWrap(data);
188    }
189  }
190
191  static class NoneUncompressor implements ByteStringUncompressor {
192
193    @Override
194    public byte[] uncompress(ByteString data, Enum dictIndex) {
195      return data.toByteArray();
196    }
197  }
198
199  private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
200    InputStream in = bs.newInput();
201    byte status = StreamUtils.readByte(in);
202    if (status == Dictionary.NOT_IN_DICTIONARY) {
203      byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
204      int bytesRead = in.read(arr);
205      if (bytesRead != arr.length) {
206        throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
207      }
208      if (dict != null) {
209        dict.addEntry(arr, 0, arr.length);
210      }
211      return arr;
212    } else {
213      // Status here is the higher-order byte of index of the dictionary entry.
214      short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
215      byte[] entry = dict.getEntry(dictIdx);
216      if (entry == null) {
217        throw new IOException("Missing dictionary entry for index " + dictIdx);
218      }
219      return entry;
220    }
221  }
222
223  static class CompressedKvEncoder extends BaseEncoder {
224    private final CompressionContext compression;
225    private final boolean hasValueCompression;
226    private final boolean hasTagCompression;
227
228    public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
229      super(out);
230      this.compression = compression;
231      this.hasValueCompression = compression.hasValueCompression();
232      this.hasTagCompression = compression.hasTagCompression();
233    }
234
235    @Override
236    public void write(ExtendedCell cell) throws IOException {
237      // We first write the KeyValue infrastructure as VInts.
238      StreamUtils.writeRawVInt32(out, KeyValueUtil.keyLength(cell));
239      StreamUtils.writeRawVInt32(out, cell.getValueLength());
240      // To support tags
241      int tagsLength = cell.getTagsLength();
242      StreamUtils.writeRawVInt32(out, tagsLength);
243      PrivateCellUtil.compressRow(out, cell,
244        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
245      PrivateCellUtil.compressFamily(out, cell,
246        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
247      PrivateCellUtil.compressQualifier(out, cell,
248        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
249      // Write timestamp, type and value.
250      StreamUtils.writeLong(out, cell.getTimestamp());
251      out.write(cell.getTypeByte());
252      if (hasValueCompression) {
253        writeCompressedValue(out, cell);
254      } else {
255        PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
256      }
257      if (tagsLength > 0) {
258        if (hasTagCompression) {
259          // Write tags using Dictionary compression
260          PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext);
261        } else {
262          // Tag compression is disabled within the WAL compression. Just write the tags bytes as
263          // it is.
264          PrivateCellUtil.writeTags(out, cell, tagsLength);
265        }
266      }
267    }
268
269    private void writeCompressedValue(OutputStream out, Cell cell) throws IOException {
270      byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(),
271        cell.getValueOffset(), cell.getValueLength());
272      StreamUtils.writeRawVInt32(out, compressed.length);
273      out.write(compressed);
274    }
275
276  }
277
278  static class CompressedKvDecoder extends BaseDecoder {
279    private final CompressionContext compression;
280    private final boolean hasValueCompression;
281    private final boolean hasTagCompression;
282
283    public CompressedKvDecoder(InputStream in, CompressionContext compression) {
284      super(in);
285      this.compression = compression;
286      this.hasValueCompression = compression.hasValueCompression();
287      this.hasTagCompression = compression.hasTagCompression();
288    }
289
290    @Override
291    protected ExtendedCell parseCell() throws IOException {
292      int keylength = StreamUtils.readRawVarint32(in);
293      int vlength = StreamUtils.readRawVarint32(in);
294      int tagsLength = StreamUtils.readRawVarint32(in);
295      int length = 0;
296      if (tagsLength == 0) {
297        length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
298      } else {
299        length = KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + keylength + vlength + tagsLength;
300      }
301
302      byte[] backingArray = new byte[length];
303      int pos = 0;
304      pos = Bytes.putInt(backingArray, pos, keylength);
305      pos = Bytes.putInt(backingArray, pos, vlength);
306
307      // the row
308      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT,
309        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
310      checkLength(elemLen, Short.MAX_VALUE);
311      pos = Bytes.putShort(backingArray, pos, (short) elemLen);
312      pos += elemLen;
313
314      // family
315      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE,
316        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
317      checkLength(elemLen, Byte.MAX_VALUE);
318      pos = Bytes.putByte(backingArray, pos, (byte) elemLen);
319      pos += elemLen;
320
321      // qualifier
322      elemLen = readIntoArray(backingArray, pos,
323        compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
324      pos += elemLen;
325
326      // timestamp
327      long ts = StreamUtils.readLong(in);
328      pos = Bytes.putLong(backingArray, pos, ts);
329      // type and value
330      int typeValLen = length - pos;
331      if (tagsLength > 0) {
332        typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE;
333      }
334      pos = Bytes.putByte(backingArray, pos, (byte) in.read());
335      int valLen = typeValLen - 1;
336      if (hasValueCompression) {
337        readCompressedValue(in, backingArray, pos, valLen);
338        pos += valLen;
339      } else {
340        IOUtils.readFully(in, backingArray, pos, valLen);
341        pos += valLen;
342      }
343      // tags
344      if (tagsLength > 0) {
345        pos = Bytes.putAsShort(backingArray, pos, tagsLength);
346        if (hasTagCompression) {
347          compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength);
348        } else {
349          IOUtils.readFully(in, backingArray, pos, tagsLength);
350        }
351      }
352      return new KeyValue(backingArray, 0, length);
353    }
354
355    private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
356      byte status = StreamUtils.readByte(in);
357      if (status == Dictionary.NOT_IN_DICTIONARY) {
358        // status byte indicating that data to be read is not in dictionary.
359        // if this isn't in the dictionary, we need to add to the dictionary.
360        int length = StreamUtils.readRawVarint32(in);
361        IOUtils.readFully(in, to, offset, length);
362        dict.addEntry(to, offset, length);
363        return length;
364      } else {
365        // the status byte also acts as the higher order byte of the dictionary entry.
366        short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
367        byte[] entry = dict.getEntry(dictIdx);
368        if (entry == null) {
369          throw new IOException("Missing dictionary entry for index " + dictIdx);
370        }
371        // now we write the uncompressed value.
372        Bytes.putBytes(to, offset, entry, 0, entry.length);
373        return entry.length;
374      }
375    }
376
377    private static void checkLength(int len, int max) throws IOException {
378      if (len < 0 || len > max) {
379        throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
380      }
381    }
382
383    private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
384      int expectedLength) throws IOException {
385      int compressedLen = StreamUtils.readRawVarint32(in);
386      compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
387        expectedLength);
388    }
389  }
390
391  public static class EnsureKvEncoder extends BaseEncoder {
392    public EnsureKvEncoder(OutputStream out) {
393      super(out);
394    }
395
396    @Override
397    public void write(ExtendedCell cell) throws IOException {
398      checkFlushed();
399      // Make sure to write tags into WAL
400      ByteBufferUtils.putInt(this.out, cell.getSerializedSize(true));
401      cell.write(out, true);
402    }
403  }
404
405  @Override
406  public Decoder getDecoder(InputStream is) {
407    return (compression == null)
408      ? new KeyValueCodecWithTags.KeyValueDecoder(is)
409      : new CompressedKvDecoder(is, compression);
410  }
411
412  @Override
413  public Decoder getDecoder(ByteBuff buf) {
414    return getDecoder(new ByteBuffInputStream(buf));
415  }
416
417  @Override
418  public Encoder getEncoder(OutputStream os) {
419    os = (os instanceof ByteBufferWriter) ? os : new ByteBufferWriterOutputStream(os);
420    if (compression == null) {
421      return new EnsureKvEncoder(os);
422    }
423    return new CompressedKvEncoder(os, compression);
424  }
425
426  public ByteStringCompressor getByteStringCompressor() {
427    return new BaosAndCompressor(compression);
428  }
429
430  public ByteStringUncompressor getByteStringUncompressor() {
431    return new StatelessUncompressor(compression);
432  }
433
434  public static ByteStringCompressor getNoneCompressor() {
435    return new NoneCompressor();
436  }
437
438  public static ByteStringUncompressor getNoneUncompressor() {
439    return new NoneUncompressor();
440  }
441}