001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.List;
032import java.util.Random;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.ByteBufferKeyValue;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.KeyValue.Type;
046import org.apache.hadoop.hbase.PrivateCellUtil;
047import org.apache.hadoop.hbase.Tag;
048import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
049import org.apache.hadoop.hbase.io.compress.Compression;
050import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
051import org.apache.hadoop.hbase.io.hfile.HFileContext;
052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
053import org.apache.hadoop.hbase.nio.SingleByteBuff;
054import org.apache.hadoop.hbase.testclassification.IOTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.RedundantKVGenerator;
058import org.junit.Assert;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.runner.RunWith;
063import org.junit.runners.Parameterized;
064import org.junit.runners.Parameterized.Parameters;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * Test all of the data block encoding algorithms for correctness. Most of the class generate data
070 * which will test different branches in code.
071 */
072@Category({ IOTests.class, LargeTests.class })
073@RunWith(Parameterized.class)
074public class TestDataBlockEncoders {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestDataBlockEncoders.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class);
081
082  private static int NUMBER_OF_KV = 10000;
083  private static int NUM_RANDOM_SEEKS = 1000;
084
085  private static int ENCODED_DATA_OFFSET =
086    HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE;
087  static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
088
089  private final Configuration conf = HBaseConfiguration.create();
090  private final RedundantKVGenerator generator = new RedundantKVGenerator();
091  private final boolean includesMemstoreTS;
092  private final boolean includesTags;
093  private final boolean useOffheapData;
094
095  @Parameters
096  public static Collection<Object[]> parameters() {
097    return HBaseTestingUtility.memStoreTSTagsAndOffheapCombination();
098  }
099
100  public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag,
101    boolean useOffheapData) {
102    this.includesMemstoreTS = includesMemstoreTS;
103    this.includesTags = includesTag;
104    this.useOffheapData = useOffheapData;
105  }
106
107  private HFileBlockEncodingContext getEncodingContext(Configuration conf,
108    Compression.Algorithm algo, DataBlockEncoding encoding) {
109    DataBlockEncoder encoder = encoding.getEncoder();
110    HFileContext meta =
111      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
112        .withIncludesTags(includesTags).withCompression(algo).build();
113    if (encoder != null) {
114      return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
115    } else {
116      return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
117    }
118  }
119
120  /**
121   * Test data block encoding of empty KeyValue. On test failure.
122   */
123  @Test
124  public void testEmptyKeyValues() throws IOException {
125    List<KeyValue> kvList = new ArrayList<>();
126    byte[] row = new byte[0];
127    byte[] family = new byte[0];
128    byte[] qualifier = new byte[0];
129    byte[] value = new byte[0];
130    if (!includesTags) {
131      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
132      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
133    } else {
134      byte[] metaValue1 = Bytes.toBytes("metaValue1");
135      byte[] metaValue2 = Bytes.toBytes("metaValue2");
136      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
137        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
138      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
139        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
140    }
141    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
142  }
143
144  /**
145   * Test KeyValues with negative timestamp. On test failure.
146   */
147  @Test
148  public void testNegativeTimestamps() throws IOException {
149    List<KeyValue> kvList = new ArrayList<>();
150    byte[] row = new byte[0];
151    byte[] family = new byte[0];
152    byte[] qualifier = new byte[0];
153    byte[] value = new byte[0];
154    if (includesTags) {
155      byte[] metaValue1 = Bytes.toBytes("metaValue1");
156      byte[] metaValue2 = Bytes.toBytes("metaValue2");
157      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
158        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
159      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
160        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
161    } else {
162      kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value));
163      kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value));
164    }
165    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
166  }
167
168  /**
169   * Test whether compression -> decompression gives the consistent results on pseudorandom sample.
170   * @throws IOException On test failure.
171   */
172  @Test
173  public void testExecutionOnSample() throws IOException {
174    List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
175    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
176  }
177
178  /**
179   * Test seeking while file is encoded.
180   */
181  @Test
182  public void testSeekingOnSample() throws IOException {
183    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
184
185    // create all seekers
186    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
187    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
188      LOG.info("Encoding: " + encoding);
189      DataBlockEncoder encoder = encoding.getEncoder();
190      if (encoder == null) {
191        continue;
192      }
193      LOG.info("Encoder: " + encoder);
194      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
195        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
196      HFileContext meta =
197        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
198          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
199      DataBlockEncoder.EncodedSeeker seeker =
200        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
201      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
202      encodedSeekers.add(seeker);
203    }
204    LOG.info("Testing it!");
205    // test it!
206    // try a few random seeks
207    Random rand = ThreadLocalRandom.current();
208    for (boolean seekBefore : new boolean[] { false, true }) {
209      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
210        int keyValueId;
211        if (!seekBefore) {
212          keyValueId = rand.nextInt(sampleKv.size());
213        } else {
214          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
215        }
216
217        KeyValue keyValue = sampleKv.get(keyValueId);
218        checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
219      }
220    }
221
222    // check edge cases
223    LOG.info("Checking edge cases");
224    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
225    for (boolean seekBefore : new boolean[] { false, true }) {
226      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
227      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
228      Cell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
229      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
230    }
231    LOG.info("Done");
232  }
233
234  @Test
235  public void testSeekingToOffHeapKeyValueInSample() throws IOException {
236    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
237
238    // create all seekers
239    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
240    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
241      LOG.info("Encoding: " + encoding);
242      DataBlockEncoder encoder = encoding.getEncoder();
243      if (encoder == null) {
244        continue;
245      }
246      LOG.info("Encoder: " + encoder);
247      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
248        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
249      HFileContext meta =
250        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
251          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
252      DataBlockEncoder.EncodedSeeker seeker =
253        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
254      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
255      encodedSeekers.add(seeker);
256    }
257    LOG.info("Testing it!");
258    // test it!
259    // try a few random seeks
260    Random rand = ThreadLocalRandom.current();
261    for (boolean seekBefore : new boolean[] { false, true }) {
262      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
263        int keyValueId;
264        if (!seekBefore) {
265          keyValueId = rand.nextInt(sampleKv.size());
266        } else {
267          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
268        }
269
270        KeyValue keyValue = sampleKv.get(keyValueId);
271        checkSeekingConsistency(encodedSeekers, seekBefore, buildOffHeapKeyValue(keyValue));
272      }
273    }
274
275    // check edge cases
276    LOG.info("Checking edge cases");
277    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
278    for (boolean seekBefore : new boolean[] { false, true }) {
279      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
280      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
281      Cell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
282      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
283    }
284    LOG.info("Done");
285  }
286
287  static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
288    HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
289    DataBlockEncoder encoder = encoding.getEncoder();
290    ByteArrayOutputStream baos = new ByteArrayOutputStream();
291    baos.write(HFILEBLOCK_DUMMY_HEADER);
292    DataOutputStream dos = new DataOutputStream(baos);
293    encoder.startBlockEncoding(encodingContext, dos);
294    for (KeyValue kv : kvs) {
295      encoder.encode(kv, encodingContext, dos);
296    }
297    encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
298    byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
299    System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
300    if (useOffheapData) {
301      ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length);
302      bb.put(encodedData);
303      bb.rewind();
304      return bb;
305    }
306    return ByteBuffer.wrap(encodedData);
307  }
308
309  @Test
310  public void testNextOnSample() throws IOException {
311    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
312
313    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
314      if (encoding.getEncoder() == null) {
315        continue;
316      }
317      DataBlockEncoder encoder = encoding.getEncoder();
318      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
319        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
320      HFileContext meta =
321        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
322          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
323      DataBlockEncoder.EncodedSeeker seeker =
324        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
325      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
326      int i = 0;
327      do {
328        KeyValue expectedKeyValue = sampleKv.get(i);
329        Cell cell = seeker.getCell();
330        if (
331          PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue,
332            cell) != 0
333        ) {
334          int commonPrefix =
335            PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true);
336          fail(String.format(
337            "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d"
338              + "\n expected %s\n actual      %s",
339            encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(),
340              expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()),
341            CellUtil.toString(cell, false)));
342        }
343        i++;
344      } while (seeker.next());
345    }
346  }
347
348  /**
349   * Test whether the decompression of first key is implemented correctly.
350   */
351  @Test
352  public void testFirstKeyInBlockOnSample() throws IOException {
353    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
354
355    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
356      if (encoding.getEncoder() == null) {
357        continue;
358      }
359      DataBlockEncoder encoder = encoding.getEncoder();
360      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
361        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
362      Cell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer));
363      KeyValue firstKv = sampleKv.get(0);
364      if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) {
365        int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true);
366        fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix));
367      }
368    }
369  }
370
371  @Test
372  public void testRowIndexWithTagsButNoTagsInCell() throws IOException {
373    List<KeyValue> kvList = new ArrayList<>();
374    byte[] row = new byte[0];
375    byte[] family = new byte[0];
376    byte[] qualifier = new byte[0];
377    byte[] value = new byte[0];
378    KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value);
379    kvList.add(expectedKV);
380    DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1;
381    DataBlockEncoder encoder = encoding.getEncoder();
382    ByteBuffer encodedBuffer =
383      encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false);
384    HFileContext meta =
385      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
386        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
387    DataBlockEncoder.EncodedSeeker seeker =
388      encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
389    seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
390    Cell cell = seeker.getCell();
391    Assert.assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength());
392  }
393
394  private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers,
395    boolean seekBefore, Cell keyValue) {
396    Cell expectedKeyValue = null;
397    ByteBuffer expectedKey = null;
398    ByteBuffer expectedValue = null;
399    for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
400      seeker.seekToKeyInBlock(keyValue, seekBefore);
401      seeker.rewind();
402
403      Cell actualKeyValue = seeker.getCell();
404      ByteBuffer actualKey = null;
405      actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey());
406      ByteBuffer actualValue = seeker.getValueShallowCopy();
407
408      if (expectedKeyValue != null) {
409        assertTrue(CellUtil.equals(expectedKeyValue, actualKeyValue));
410      } else {
411        expectedKeyValue = actualKeyValue;
412      }
413
414      if (expectedKey != null) {
415        assertEquals(expectedKey, actualKey);
416      } else {
417        expectedKey = actualKey;
418      }
419
420      if (expectedValue != null) {
421        assertEquals(expectedValue, actualValue);
422      } else {
423        expectedValue = actualValue;
424      }
425    }
426  }
427
428  private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
429    boolean includesTags) throws IOException {
430    ByteBuffer unencodedDataBuf =
431      RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS);
432    HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
433      .withIncludesTags(includesTags).build();
434    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
435      DataBlockEncoder encoder = encoding.getEncoder();
436      if (encoder == null) {
437        continue;
438      }
439      HFileBlockEncodingContext encodingContext =
440        new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext);
441      ByteArrayOutputStream baos = new ByteArrayOutputStream();
442      baos.write(HFILEBLOCK_DUMMY_HEADER);
443      DataOutputStream dos = new DataOutputStream(baos);
444      encoder.startBlockEncoding(encodingContext, dos);
445      for (KeyValue kv : kvList) {
446        encoder.encode(kv, encodingContext, dos);
447      }
448      encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
449      byte[] encodedData = baos.toByteArray();
450
451      testAlgorithm(encodedData, unencodedDataBuf, encoder);
452    }
453  }
454
455  @Test
456  public void testZeroByte() throws IOException {
457    List<KeyValue> kvList = new ArrayList<>();
458    byte[] row = Bytes.toBytes("abcd");
459    byte[] family = new byte[] { 'f' };
460    byte[] qualifier0 = new byte[] { 'b' };
461    byte[] qualifier1 = new byte[] { 'c' };
462    byte[] value0 = new byte[] { 'd' };
463    byte[] value1 = new byte[] { 0x00 };
464    if (includesTags) {
465      kvList.add(new KeyValue(row, family, qualifier0, 0, value0,
466        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
467      kvList.add(new KeyValue(row, family, qualifier1, 0, value1,
468        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
469    } else {
470      kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
471      kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
472    }
473    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
474  }
475
476  private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
477    DataBlockEncoder encoder) throws IOException {
478    // decode
479    ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
480      encodedData.length - ENCODED_DATA_OFFSET);
481    DataInputStream dis = new DataInputStream(bais);
482    ByteBuffer actualDataset;
483    HFileContext meta =
484      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
485        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
486    actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta));
487    actualDataset.rewind();
488
489    // this is because in case of prefix tree the decoded stream will not have
490    // the
491    // mvcc in it.
492    assertEquals("Encoding -> decoding gives different results for " + encoder,
493      Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset));
494  }
495
496  private static ByteBufferKeyValue buildOffHeapKeyValue(KeyValue keyValue) throws IOException {
497    ByteArrayOutputStream out = new ByteArrayOutputStream();
498    keyValue.write(out, false);
499    byte[] bytes = out.toByteArray();
500    ByteBuffer bb = ByteBuffer.allocateDirect(bytes.length);
501    bb.put(bytes);
502    bb.flip();
503
504    return new ByteBufferKeyValue(bb, 0, bytes.length);
505  }
506}