001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.encoding;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.ByteArrayInputStream;
025import java.io.DataInputStream;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.List;
032import java.util.Random;
033import java.util.concurrent.ThreadLocalRandom;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.ArrayBackedTag;
036import org.apache.hadoop.hbase.ByteBufferKeyValue;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellComparatorImpl;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.ExtendedCell;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.KeyValue;
046import org.apache.hadoop.hbase.KeyValue.Type;
047import org.apache.hadoop.hbase.PrivateCellUtil;
048import org.apache.hadoop.hbase.Tag;
049import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
050import org.apache.hadoop.hbase.io.compress.Compression;
051import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
052import org.apache.hadoop.hbase.io.hfile.HFileContext;
053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
054import org.apache.hadoop.hbase.nio.SingleByteBuff;
055import org.apache.hadoop.hbase.testclassification.IOTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.RedundantKVGenerator;
059import org.junit.Assert;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065import org.junit.runners.Parameterized.Parameters;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069/**
070 * Test all of the data block encoding algorithms for correctness. Most of the class generate data
071 * which will test different branches in code.
072 */
073@Category({ IOTests.class, LargeTests.class })
074@RunWith(Parameterized.class)
075public class TestDataBlockEncoders {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestDataBlockEncoders.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class);
082
083  private static int NUMBER_OF_KV = 10000;
084  private static int NUM_RANDOM_SEEKS = 1000;
085
086  private static int ENCODED_DATA_OFFSET =
087    HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE;
088  static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE];
089
090  private final Configuration conf = HBaseConfiguration.create();
091  private final RedundantKVGenerator generator = new RedundantKVGenerator();
092  private final boolean includesMemstoreTS;
093  private final boolean includesTags;
094  private final boolean useOffheapData;
095
096  @Parameters
097  public static Collection<Object[]> parameters() {
098    return HBaseTestingUtil.memStoreTSTagsAndOffheapCombination();
099  }
100
101  public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag,
102    boolean useOffheapData) {
103    this.includesMemstoreTS = includesMemstoreTS;
104    this.includesTags = includesTag;
105    this.useOffheapData = useOffheapData;
106  }
107
108  private HFileBlockEncodingContext getEncodingContext(Configuration conf,
109    Compression.Algorithm algo, DataBlockEncoding encoding) {
110    DataBlockEncoder encoder = encoding.getEncoder();
111    HFileContext meta =
112      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
113        .withIncludesTags(includesTags).withCompression(algo).build();
114    if (encoder != null) {
115      return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
116    } else {
117      return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta);
118    }
119  }
120
121  /**
122   * Test data block encoding of empty KeyValue. On test failure.
123   */
124  @Test
125  public void testEmptyKeyValues() throws IOException {
126    List<KeyValue> kvList = new ArrayList<>();
127    byte[] row = new byte[0];
128    byte[] family = new byte[0];
129    byte[] qualifier = new byte[0];
130    byte[] value = new byte[0];
131    if (!includesTags) {
132      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
133      kvList.add(new KeyValue(row, family, qualifier, 0L, value));
134    } else {
135      byte[] metaValue1 = Bytes.toBytes("metaValue1");
136      byte[] metaValue2 = Bytes.toBytes("metaValue2");
137      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
138        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
139      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
140        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
141    }
142    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
143  }
144
145  /**
146   * Test KeyValues with negative timestamp. On test failure.
147   */
148  @Test
149  public void testNegativeTimestamps() throws IOException {
150    List<KeyValue> kvList = new ArrayList<>();
151    byte[] row = new byte[0];
152    byte[] family = new byte[0];
153    byte[] qualifier = new byte[0];
154    byte[] value = new byte[0];
155    if (includesTags) {
156      byte[] metaValue1 = Bytes.toBytes("metaValue1");
157      byte[] metaValue2 = Bytes.toBytes("metaValue2");
158      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
159        new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) }));
160      kvList.add(new KeyValue(row, family, qualifier, 0L, value,
161        new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) }));
162    } else {
163      kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value));
164      kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value));
165    }
166    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
167  }
168
169  /**
170   * Test whether compression -> decompression gives the consistent results on pseudorandom sample.
171   * @throws IOException On test failure.
172   */
173  @Test
174  public void testExecutionOnSample() throws IOException {
175    List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
176    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
177  }
178
179  /**
180   * Test seeking while file is encoded.
181   */
182  @Test
183  public void testSeekingOnSample() throws IOException {
184    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
185
186    // create all seekers
187    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
188    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
189      LOG.info("Encoding: " + encoding);
190      DataBlockEncoder encoder = encoding.getEncoder();
191      if (encoder == null) {
192        continue;
193      }
194      LOG.info("Encoder: " + encoder);
195      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
196        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
197      HFileContext meta =
198        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
199          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
200      DataBlockEncoder.EncodedSeeker seeker =
201        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
202      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
203      encodedSeekers.add(seeker);
204    }
205    LOG.info("Testing it!");
206    // test it!
207    // try a few random seeks
208    Random rand = ThreadLocalRandom.current();
209    for (boolean seekBefore : new boolean[] { false, true }) {
210      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
211        int keyValueId;
212        if (!seekBefore) {
213          keyValueId = rand.nextInt(sampleKv.size());
214        } else {
215          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
216        }
217
218        KeyValue keyValue = sampleKv.get(keyValueId);
219        checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
220      }
221    }
222
223    // check edge cases
224    LOG.info("Checking edge cases");
225    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
226    for (boolean seekBefore : new boolean[] { false, true }) {
227      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
228      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
229      ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
230      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
231    }
232    LOG.info("Done");
233  }
234
235  @Test
236  public void testSeekingToOffHeapKeyValueInSample() throws IOException {
237    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
238
239    // create all seekers
240    List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>();
241    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
242      LOG.info("Encoding: " + encoding);
243      DataBlockEncoder encoder = encoding.getEncoder();
244      if (encoder == null) {
245        continue;
246      }
247      LOG.info("Encoder: " + encoder);
248      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
249        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
250      HFileContext meta =
251        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
252          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
253      DataBlockEncoder.EncodedSeeker seeker =
254        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
255      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
256      encodedSeekers.add(seeker);
257    }
258    LOG.info("Testing it!");
259    // test it!
260    // try a few random seeks
261    Random rand = ThreadLocalRandom.current();
262    for (boolean seekBefore : new boolean[] { false, true }) {
263      for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
264        int keyValueId;
265        if (!seekBefore) {
266          keyValueId = rand.nextInt(sampleKv.size());
267        } else {
268          keyValueId = rand.nextInt(sampleKv.size() - 1) + 1;
269        }
270
271        KeyValue keyValue = sampleKv.get(keyValueId);
272        checkSeekingConsistency(encodedSeekers, seekBefore, buildOffHeapKeyValue(keyValue));
273      }
274    }
275
276    // check edge cases
277    LOG.info("Checking edge cases");
278    checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
279    for (boolean seekBefore : new boolean[] { false, true }) {
280      checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
281      KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
282      ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv);
283      checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
284    }
285    LOG.info("Done");
286  }
287
288  static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
289    HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException {
290    DataBlockEncoder encoder = encoding.getEncoder();
291    ByteArrayOutputStream baos = new ByteArrayOutputStream();
292    baos.write(HFILEBLOCK_DUMMY_HEADER);
293    DataOutputStream dos = new DataOutputStream(baos);
294    encoder.startBlockEncoding(encodingContext, dos);
295    for (KeyValue kv : kvs) {
296      encoder.encode(kv, encodingContext, dos);
297    }
298    encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
299    byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
300    System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
301    if (useOffheapData) {
302      ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length);
303      bb.put(encodedData);
304      bb.rewind();
305      return bb;
306    }
307    return ByteBuffer.wrap(encodedData);
308  }
309
310  @Test
311  public void testNextOnSample() throws IOException {
312    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
313
314    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
315      if (encoding.getEncoder() == null) {
316        continue;
317      }
318      DataBlockEncoder encoder = encoding.getEncoder();
319      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
320        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
321      HFileContext meta =
322        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
323          .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
324      DataBlockEncoder.EncodedSeeker seeker =
325        encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
326      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
327      int i = 0;
328      do {
329        KeyValue expectedKeyValue = sampleKv.get(i);
330        ExtendedCell cell = seeker.getCell();
331        if (
332          PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue,
333            cell) != 0
334        ) {
335          int commonPrefix =
336            PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true);
337          fail(String.format(
338            "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d"
339              + "\n expected %s\n actual      %s",
340            encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(),
341              expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()),
342            CellUtil.toString(cell, false)));
343        }
344        i++;
345      } while (seeker.next());
346    }
347  }
348
349  /**
350   * Test whether the decompression of first key is implemented correctly.
351   */
352  @Test
353  public void testFirstKeyInBlockOnSample() throws IOException {
354    List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
355
356    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
357      if (encoding.getEncoder() == null) {
358        continue;
359      }
360      DataBlockEncoder encoder = encoding.getEncoder();
361      ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
362        getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData);
363      ExtendedCell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer));
364      KeyValue firstKv = sampleKv.get(0);
365      if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) {
366        int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true);
367        fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix));
368      }
369    }
370  }
371
372  @Test
373  public void testRowIndexWithTagsButNoTagsInCell() throws IOException {
374    List<KeyValue> kvList = new ArrayList<>();
375    byte[] row = new byte[0];
376    byte[] family = new byte[0];
377    byte[] qualifier = new byte[0];
378    byte[] value = new byte[0];
379    KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value);
380    kvList.add(expectedKV);
381    DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1;
382    DataBlockEncoder encoder = encoding.getEncoder();
383    ByteBuffer encodedBuffer =
384      encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false);
385    HFileContext meta =
386      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
387        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
388    DataBlockEncoder.EncodedSeeker seeker =
389      encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta));
390    seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
391    Cell cell = seeker.getCell();
392    Assert.assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength());
393  }
394
395  private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers,
396    boolean seekBefore, ExtendedCell keyValue) {
397    ExtendedCell expectedKeyValue = null;
398    ByteBuffer expectedKey = null;
399    ByteBuffer expectedValue = null;
400    for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
401      seeker.seekToKeyInBlock(keyValue, seekBefore);
402      seeker.rewind();
403
404      ExtendedCell actualKeyValue = seeker.getCell();
405      ByteBuffer actualKey = null;
406      actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey());
407      ByteBuffer actualValue = seeker.getValueShallowCopy();
408
409      if (expectedKeyValue != null) {
410        assertTrue(PrivateCellUtil.equals(expectedKeyValue, actualKeyValue));
411      } else {
412        expectedKeyValue = actualKeyValue;
413      }
414
415      if (expectedKey != null) {
416        assertEquals(expectedKey, actualKey);
417      } else {
418        expectedKey = actualKey;
419      }
420
421      if (expectedValue != null) {
422        assertEquals(expectedValue, actualValue);
423      } else {
424        expectedValue = actualValue;
425      }
426    }
427  }
428
429  private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
430    boolean includesTags) throws IOException {
431    ByteBuffer unencodedDataBuf =
432      RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS);
433    HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
434      .withIncludesTags(includesTags).build();
435    for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
436      DataBlockEncoder encoder = encoding.getEncoder();
437      if (encoder == null) {
438        continue;
439      }
440      HFileBlockEncodingContext encodingContext =
441        new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext);
442      ByteArrayOutputStream baos = new ByteArrayOutputStream();
443      baos.write(HFILEBLOCK_DUMMY_HEADER);
444      DataOutputStream dos = new DataOutputStream(baos);
445      encoder.startBlockEncoding(encodingContext, dos);
446      for (KeyValue kv : kvList) {
447        encoder.encode(kv, encodingContext, dos);
448      }
449      encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer());
450      byte[] encodedData = baos.toByteArray();
451
452      testAlgorithm(encodedData, unencodedDataBuf, encoder);
453    }
454  }
455
456  @Test
457  public void testZeroByte() throws IOException {
458    List<KeyValue> kvList = new ArrayList<>();
459    byte[] row = Bytes.toBytes("abcd");
460    byte[] family = new byte[] { 'f' };
461    byte[] qualifier0 = new byte[] { 'b' };
462    byte[] qualifier1 = new byte[] { 'c' };
463    byte[] value0 = new byte[] { 'd' };
464    byte[] value1 = new byte[] { 0x00 };
465    if (includesTags) {
466      kvList.add(new KeyValue(row, family, qualifier0, 0, value0,
467        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
468      kvList.add(new KeyValue(row, family, qualifier1, 0, value1,
469        new Tag[] { new ArrayBackedTag((byte) 1, "value1") }));
470    } else {
471      kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
472      kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
473    }
474    testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
475  }
476
477  private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
478    DataBlockEncoder encoder) throws IOException {
479    // decode
480    ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
481      encodedData.length - ENCODED_DATA_OFFSET);
482    DataInputStream dis = new DataInputStream(bais);
483    ByteBuffer actualDataset;
484    HFileContext meta =
485      new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
486        .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build();
487    actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta));
488    actualDataset.rewind();
489
490    // this is because in case of prefix tree the decoded stream will not have
491    // the
492    // mvcc in it.
493    assertEquals("Encoding -> decoding gives different results for " + encoder,
494      Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset));
495  }
496
497  private static ByteBufferKeyValue buildOffHeapKeyValue(KeyValue keyValue) throws IOException {
498    ByteArrayOutputStream out = new ByteArrayOutputStream();
499    keyValue.write(out, false);
500    byte[] bytes = out.toByteArray();
501    ByteBuffer bb = ByteBuffer.allocateDirect(bytes.length);
502    bb.put(bytes);
503    bb.flip();
504
505    return new ByteBufferKeyValue(bb, 0, bytes.length);
506  }
507}