001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import org.apache.hadoop.hbase.ExtendedCell;
026import org.apache.hadoop.hbase.KeyValue;
027import org.apache.hadoop.hbase.KeyValueUtil;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.nio.ByteBuff;
030import org.apache.hadoop.hbase.util.ByteBufferUtils;
031import org.apache.hadoop.hbase.util.Bytes;
032import org.apache.hadoop.hbase.util.ObjectIntPair;
033import org.apache.yetus.audience.InterfaceAudience;
034
035/**
036 * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster. Compress using: - store
037 * size of common prefix - save column family once in the first KeyValue - use integer compression
038 * for key, value and prefix (7-bit encoding) - use bits to avoid duplication key length, value
039 * length and type if it same as previous - store in 3 bits length of prefix timestamp with previous
040 * KeyValue's timestamp - one bit which allow to omit value if it is the same Format: - 1 byte: flag
041 * - 1-5 bytes: key length (only if FLAG_SAME_KEY_LENGTH is not set in flag) - 1-5 bytes: value
042 * length (only if FLAG_SAME_VALUE_LENGTH is not set in flag) - 1-5 bytes: prefix length - ...
043 * bytes: rest of the row (if prefix length is small enough) - ... bytes: qualifier (or suffix
044 * depending on prefix length) - 1-8 bytes: timestamp suffix - 1 byte: type (only if FLAG_SAME_TYPE
045 * is not set in the flag) - ... bytes: value (only if FLAG_SAME_VALUE is not set in the flag)
046 */
047@InterfaceAudience.Private
048public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
049  static final int MASK_TIMESTAMP_LENGTH = (1 << 0) | (1 << 1) | (1 << 2);
050  static final int SHIFT_TIMESTAMP_LENGTH = 0;
051  static final int FLAG_SAME_KEY_LENGTH = 1 << 3;
052  static final int FLAG_SAME_VALUE_LENGTH = 1 << 4;
053  static final int FLAG_SAME_TYPE = 1 << 5;
054  static final int FLAG_SAME_VALUE = 1 << 6;
055
056  private static class FastDiffCompressionState extends CompressionState {
057    byte[] timestamp = new byte[KeyValue.TIMESTAMP_SIZE];
058    int prevTimestampOffset;
059
060    @Override
061    protected void readTimestamp(ByteBuffer in) {
062      in.get(timestamp);
063    }
064
065    @Override
066    void copyFrom(CompressionState state) {
067      super.copyFrom(state);
068      FastDiffCompressionState state2 = (FastDiffCompressionState) state;
069      System.arraycopy(state2.timestamp, 0, timestamp, 0, KeyValue.TIMESTAMP_SIZE);
070      prevTimestampOffset = state2.prevTimestampOffset;
071    }
072
073    /**
074     * Copies the first key/value from the given stream, and initializes decompression state based
075     * on it. Assumes that we have already read key and value lengths. Does not set
076     * {@link #qualifierLength} (not used by decompression) or {@link #prevOffset} (set by the calle
077     * afterwards).
078     */
079    private void decompressFirstKV(ByteBuffer out, DataInputStream in) throws IOException {
080      int kvPos = out.position();
081      out.putInt(keyLength);
082      out.putInt(valueLength);
083      prevTimestampOffset = out.position() + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE;
084      ByteBufferUtils.copyFromStreamToBuffer(out, in, keyLength + valueLength);
085      rowLength = out.getShort(kvPos + KeyValue.ROW_OFFSET);
086      familyLength = out.get(kvPos + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + rowLength);
087      type = out.get(prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
088    }
089
090  }
091
092  private int findCommonTimestampPrefix(byte[] curTsBuf, byte[] prevTsBuf) {
093    int commonPrefix = 0;
094    while (
095      commonPrefix < (KeyValue.TIMESTAMP_SIZE - 1)
096        && curTsBuf[commonPrefix] == prevTsBuf[commonPrefix]
097    ) {
098      commonPrefix++;
099    }
100    return commonPrefix; // has to be at most 7 bytes
101  }
102
103  private void uncompressSingleKeyValue(DataInputStream source, ByteBuffer out,
104    FastDiffCompressionState state) throws IOException, EncoderBufferTooSmallException {
105    byte flag = source.readByte();
106    int prevKeyLength = state.keyLength;
107
108    if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
109      state.keyLength = ByteBufferUtils.readCompressedInt(source);
110    }
111    if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
112      state.valueLength = ByteBufferUtils.readCompressedInt(source);
113    }
114    int commonLength = ByteBufferUtils.readCompressedInt(source);
115
116    ensureSpace(out, state.keyLength + state.valueLength + KeyValue.ROW_OFFSET);
117
118    int kvPos = out.position();
119
120    if (!state.isFirst()) {
121      // copy the prefix
122      int common;
123      int prevOffset;
124
125      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
126        out.putInt(state.keyLength);
127        out.putInt(state.valueLength);
128        prevOffset = state.prevOffset + KeyValue.ROW_OFFSET;
129        common = commonLength;
130      } else {
131        if ((flag & FLAG_SAME_KEY_LENGTH) != 0) {
132          prevOffset = state.prevOffset;
133          common = commonLength + KeyValue.ROW_OFFSET;
134        } else {
135          out.putInt(state.keyLength);
136          prevOffset = state.prevOffset + KeyValue.KEY_LENGTH_SIZE;
137          common = commonLength + KeyValue.KEY_LENGTH_SIZE;
138        }
139      }
140
141      ByteBufferUtils.copyFromBufferToBuffer(out, out, prevOffset, common);
142
143      // copy the rest of the key from the buffer
144      int keyRestLength;
145      if (commonLength < state.rowLength + KeyValue.ROW_LENGTH_SIZE) {
146        // omit the family part of the key, it is always the same
147        int rowWithSizeLength;
148        int rowRestLength;
149
150        // check length of row
151        if (commonLength < KeyValue.ROW_LENGTH_SIZE) {
152          // not yet copied, do it now
153          ByteBufferUtils.copyFromStreamToBuffer(out, source,
154            KeyValue.ROW_LENGTH_SIZE - commonLength);
155
156          rowWithSizeLength =
157            out.getShort(out.position() - KeyValue.ROW_LENGTH_SIZE) + KeyValue.ROW_LENGTH_SIZE;
158          rowRestLength = rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE;
159        } else {
160          // already in kvBuffer, just read it
161          rowWithSizeLength = out.getShort(kvPos + KeyValue.ROW_OFFSET) + KeyValue.ROW_LENGTH_SIZE;
162          rowRestLength = rowWithSizeLength - commonLength;
163        }
164
165        // copy the rest of row
166        ByteBufferUtils.copyFromStreamToBuffer(out, source, rowRestLength);
167
168        // copy the column family
169        ByteBufferUtils.copyFromBufferToBuffer(out, out,
170          state.prevOffset + KeyValue.ROW_OFFSET + KeyValue.ROW_LENGTH_SIZE + state.rowLength,
171          state.familyLength + KeyValue.FAMILY_LENGTH_SIZE);
172        state.rowLength = (short) (rowWithSizeLength - KeyValue.ROW_LENGTH_SIZE);
173
174        keyRestLength = state.keyLength - rowWithSizeLength - state.familyLength
175          - (KeyValue.FAMILY_LENGTH_SIZE + KeyValue.TIMESTAMP_TYPE_SIZE);
176      } else {
177        // prevRowWithSizeLength is the same as on previous row
178        keyRestLength = state.keyLength - commonLength - KeyValue.TIMESTAMP_TYPE_SIZE;
179      }
180      // copy the rest of the key, after column family == column qualifier
181      ByteBufferUtils.copyFromStreamToBuffer(out, source, keyRestLength);
182
183      // copy timestamp
184      int prefixTimestamp = (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
185      ByteBufferUtils.copyFromBufferToBuffer(out, out, state.prevTimestampOffset, prefixTimestamp);
186      state.prevTimestampOffset = out.position() - prefixTimestamp;
187      ByteBufferUtils.copyFromStreamToBuffer(out, source,
188        KeyValue.TIMESTAMP_SIZE - prefixTimestamp);
189
190      // copy the type and value
191      if ((flag & FLAG_SAME_TYPE) != 0) {
192        out.put(state.type);
193        if ((flag & FLAG_SAME_VALUE) != 0) {
194          ByteBufferUtils.copyFromBufferToBuffer(out, out,
195            state.prevOffset + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
196        } else {
197          ByteBufferUtils.copyFromStreamToBuffer(out, source, state.valueLength);
198        }
199      } else {
200        if ((flag & FLAG_SAME_VALUE) != 0) {
201          ByteBufferUtils.copyFromStreamToBuffer(out, source, KeyValue.TYPE_SIZE);
202          ByteBufferUtils.copyFromBufferToBuffer(out, out,
203            state.prevOffset + KeyValue.ROW_OFFSET + prevKeyLength, state.valueLength);
204        } else {
205          ByteBufferUtils.copyFromStreamToBuffer(out, source,
206            state.valueLength + KeyValue.TYPE_SIZE);
207        }
208        state.type = out.get(state.prevTimestampOffset + KeyValue.TIMESTAMP_SIZE);
209      }
210    } else { // this is the first element
211      state.decompressFirstKV(out, source);
212    }
213
214    state.prevOffset = kvPos;
215  }
216
217  @Override
218  public int internalEncode(ExtendedCell cell, HFileBlockDefaultEncodingContext encodingContext,
219    DataOutputStream out) throws IOException {
220    EncodingState state = encodingContext.getEncodingState();
221    int size = compressSingleKeyValue(out, cell, state.prevCell);
222    size += afterEncodingKeyValue(cell, out, encodingContext);
223    state.prevCell = cell;
224    return size;
225  }
226
227  private int compressSingleKeyValue(DataOutputStream out, ExtendedCell cell, ExtendedCell prevCell)
228    throws IOException {
229    int flag = 0; // Do not use more bits than will fit into a byte
230    int kLength = KeyValueUtil.keyLength(cell);
231    int vLength = cell.getValueLength();
232
233    if (prevCell == null) {
234      // copy the key, there is no common prefix with none
235      out.write(flag);
236      ByteBufferUtils.putCompressedInt(out, kLength);
237      ByteBufferUtils.putCompressedInt(out, vLength);
238      ByteBufferUtils.putCompressedInt(out, 0);
239      PrivateCellUtil.writeFlatKey(cell, (DataOutput) out);
240      // Write the value part
241      PrivateCellUtil.writeValue(out, cell, cell.getValueLength());
242    } else {
243      int preKeyLength = KeyValueUtil.keyLength(prevCell);
244      int preValLength = prevCell.getValueLength();
245      // find a common prefix and skip it
246      int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(cell, prevCell, true, false);
247
248      if (kLength == preKeyLength) {
249        flag |= FLAG_SAME_KEY_LENGTH;
250      }
251      if (vLength == prevCell.getValueLength()) {
252        flag |= FLAG_SAME_VALUE_LENGTH;
253      }
254      if (cell.getTypeByte() == prevCell.getTypeByte()) {
255        flag |= FLAG_SAME_TYPE;
256      }
257
258      byte[] curTsBuf = Bytes.toBytes(cell.getTimestamp());
259      int commonTimestampPrefix =
260        findCommonTimestampPrefix(curTsBuf, Bytes.toBytes(prevCell.getTimestamp()));
261
262      flag |= commonTimestampPrefix << SHIFT_TIMESTAMP_LENGTH;
263
264      // Check if current and previous values are the same. Compare value
265      // length first as an optimization.
266      if (
267        vLength == preValLength
268          && PrivateCellUtil.matchingValue(cell, prevCell, vLength, preValLength)
269      ) {
270        flag |= FLAG_SAME_VALUE;
271      }
272
273      out.write(flag);
274      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
275        ByteBufferUtils.putCompressedInt(out, kLength);
276      }
277      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
278        ByteBufferUtils.putCompressedInt(out, vLength);
279      }
280      ByteBufferUtils.putCompressedInt(out, commonPrefix);
281      short rLen = cell.getRowLength();
282      if (commonPrefix < rLen + KeyValue.ROW_LENGTH_SIZE) {
283        // Previous and current rows are different. Copy the differing part of
284        // the row, skip the column family, and copy the qualifier.
285        PrivateCellUtil.writeRowKeyExcludingCommon(cell, rLen, commonPrefix, out);
286        PrivateCellUtil.writeQualifier(out, cell, cell.getQualifierLength());
287      } else {
288        // The common part includes the whole row. As the column family is the
289        // same across the whole file, it will automatically be included in the
290        // common prefix, so we need not special-case it here.
291        // What we write here is the non common part of the qualifier
292        int commonQualPrefix = commonPrefix - (rLen + KeyValue.ROW_LENGTH_SIZE)
293          - (cell.getFamilyLength() + KeyValue.FAMILY_LENGTH_SIZE);
294        PrivateCellUtil.writeQualifierSkippingBytes(out, cell, cell.getQualifierLength(),
295          commonQualPrefix);
296      }
297      // Write non common ts part
298      out.write(curTsBuf, commonTimestampPrefix, KeyValue.TIMESTAMP_SIZE - commonTimestampPrefix);
299
300      // Write the type if it is not the same as before.
301      if ((flag & FLAG_SAME_TYPE) == 0) {
302        out.write(cell.getTypeByte());
303      }
304
305      // Write the value if it is not the same as before.
306      if ((flag & FLAG_SAME_VALUE) == 0) {
307        PrivateCellUtil.writeValue(out, cell, vLength);
308      }
309    }
310    return kLength + vLength + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
311  }
312
313  @Override
314  protected ByteBuffer internalDecodeKeyValues(DataInputStream source, int allocateHeaderLength,
315    int skipLastBytes, HFileBlockDefaultDecodingContext decodingCtx) throws IOException {
316    int decompressedSize = source.readInt();
317    ByteBuffer buffer = ByteBuffer.allocate(decompressedSize + allocateHeaderLength);
318    buffer.position(allocateHeaderLength);
319    FastDiffCompressionState state = new FastDiffCompressionState();
320    while (source.available() > skipLastBytes) {
321      uncompressSingleKeyValue(source, buffer, state);
322      afterDecodingKeyValue(source, buffer, decodingCtx);
323    }
324
325    if (source.available() != skipLastBytes) {
326      throw new IllegalStateException("Read too much bytes.");
327    }
328
329    return buffer;
330  }
331
332  @Override
333  public ExtendedCell getFirstKeyCellInBlock(ByteBuff block) {
334    block.mark();
335    block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
336    int keyLength = ByteBuff.readCompressedInt(block);
337    // TODO : See if we can avoid these reads as the read values are not getting used
338    ByteBuff.readCompressedInt(block); // valueLength
339    ByteBuff.readCompressedInt(block); // commonLength
340    ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
341    block.reset();
342    return createFirstKeyCell(key, keyLength);
343  }
344
345  @Override
346  public String toString() {
347    return FastDiffDeltaEncoder.class.getSimpleName();
348  }
349
350  protected static class FastDiffSeekerState extends SeekerState {
351    private byte[] prevTimestampAndType = new byte[KeyValue.TIMESTAMP_TYPE_SIZE];
352    private int rowLengthWithSize;
353    private int familyLengthWithSize;
354
355    public FastDiffSeekerState(ObjectIntPair<ByteBuffer> tmpPair, boolean includeTags) {
356      super(tmpPair, includeTags);
357    }
358
359    @Override
360    protected void copyFromNext(SeekerState that) {
361      super.copyFromNext(that);
362      FastDiffSeekerState other = (FastDiffSeekerState) that;
363      System.arraycopy(other.prevTimestampAndType, 0, prevTimestampAndType, 0,
364        KeyValue.TIMESTAMP_TYPE_SIZE);
365      rowLengthWithSize = other.rowLengthWithSize;
366      familyLengthWithSize = other.familyLengthWithSize;
367    }
368  }
369
370  @Override
371  public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
372    return new FastDiffSeekerStateBufferedEncodedSeeker(decodingCtx);
373  }
374
375  private static class FastDiffSeekerStateBufferedEncodedSeeker
376    extends BufferedEncodedSeeker<FastDiffSeekerState> {
377
378    private FastDiffSeekerStateBufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
379      super(decodingCtx);
380    }
381
382    private void decode(boolean isFirst) {
383      byte flag = currentBuffer.get();
384      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
385        if (!isFirst) {
386          System.arraycopy(current.keyBuffer,
387            current.keyLength - current.prevTimestampAndType.length, current.prevTimestampAndType,
388            0, current.prevTimestampAndType.length);
389        }
390        current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
391      }
392      if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
393        current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
394      }
395      current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
396
397      current.ensureSpaceForKey();
398
399      if (isFirst) {
400        // copy everything
401        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
402          current.keyLength - current.prevTimestampAndType.length);
403        current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + Bytes.SIZEOF_SHORT;
404        current.familyLengthWithSize =
405          current.keyBuffer[current.rowLengthWithSize] + Bytes.SIZEOF_BYTE;
406      } else if (current.lastCommonPrefix < Bytes.SIZEOF_SHORT) {
407        // length of row is different, copy everything except family
408
409        // copy the row size
410        int oldRowLengthWithSize = current.rowLengthWithSize;
411        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
412          Bytes.SIZEOF_SHORT - current.lastCommonPrefix);
413        current.rowLengthWithSize = Bytes.toShort(current.keyBuffer, 0) + Bytes.SIZEOF_SHORT;
414
415        // move the column family
416        System.arraycopy(current.keyBuffer, oldRowLengthWithSize, current.keyBuffer,
417          current.rowLengthWithSize, current.familyLengthWithSize);
418
419        // copy the rest of row
420        currentBuffer.get(current.keyBuffer, Bytes.SIZEOF_SHORT,
421          current.rowLengthWithSize - Bytes.SIZEOF_SHORT);
422
423        // copy the qualifier
424        currentBuffer.get(current.keyBuffer,
425          current.rowLengthWithSize + current.familyLengthWithSize,
426          current.keyLength - current.rowLengthWithSize - current.familyLengthWithSize
427            - current.prevTimestampAndType.length);
428      } else if (current.lastCommonPrefix < current.rowLengthWithSize) {
429        // We have to copy part of row and qualifier, but the column family
430        // is in the right place.
431
432        // before column family (rest of row)
433        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
434          current.rowLengthWithSize - current.lastCommonPrefix);
435
436        // after column family (qualifier)
437        currentBuffer.get(current.keyBuffer,
438          current.rowLengthWithSize + current.familyLengthWithSize,
439          current.keyLength - current.rowLengthWithSize - current.familyLengthWithSize
440            - current.prevTimestampAndType.length);
441      } else {
442        // copy just the ending
443        currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
444          current.keyLength - current.prevTimestampAndType.length - current.lastCommonPrefix);
445      }
446
447      // timestamp
448      int pos = current.keyLength - current.prevTimestampAndType.length;
449      int commonTimestampPrefix = (flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH;
450      if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
451        System.arraycopy(current.prevTimestampAndType, 0, current.keyBuffer, pos,
452          commonTimestampPrefix);
453      }
454      pos += commonTimestampPrefix;
455      currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_LONG - commonTimestampPrefix);
456      pos += Bytes.SIZEOF_LONG - commonTimestampPrefix;
457
458      // type
459      if ((flag & FLAG_SAME_TYPE) == 0) {
460        currentBuffer.get(current.keyBuffer, pos, Bytes.SIZEOF_BYTE);
461      } else if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
462        current.keyBuffer[pos] = current.prevTimestampAndType[Bytes.SIZEOF_LONG];
463      }
464
465      // handle value
466      if ((flag & FLAG_SAME_VALUE) == 0) {
467        current.valueOffset = currentBuffer.position();
468        currentBuffer.skip(current.valueLength);
469      }
470
471      if (includesTags()) {
472        decodeTags();
473      }
474      if (includesMvcc()) {
475        current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
476      } else {
477        current.memstoreTS = 0;
478      }
479      current.nextKvOffset = currentBuffer.position();
480    }
481
482    @Override
483    protected void decodeFirst() {
484      currentBuffer.skip(Bytes.SIZEOF_INT);
485      decode(true);
486    }
487
488    @Override
489    protected void decodeNext() {
490      decode(false);
491    }
492
493    @Override
494    protected FastDiffSeekerState createSeekerState() {
495      return new FastDiffSeekerState(this.tmpPair, this.includesTags());
496    }
497  }
498}