001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.List; 032import java.util.Random; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ArrayBackedTag; 036import org.apache.hadoop.hbase.ByteBufferKeyValue; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellComparatorImpl; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.KeyValue; 045import org.apache.hadoop.hbase.KeyValue.Type; 046import org.apache.hadoop.hbase.PrivateCellUtil; 047import org.apache.hadoop.hbase.Tag; 048import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 049import org.apache.hadoop.hbase.io.compress.Compression; 050import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 051import org.apache.hadoop.hbase.io.hfile.HFileContext; 052import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 053import org.apache.hadoop.hbase.nio.SingleByteBuff; 054import org.apache.hadoop.hbase.testclassification.IOTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.RedundantKVGenerator; 058import org.junit.Assert; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.runner.RunWith; 063import org.junit.runners.Parameterized; 064import org.junit.runners.Parameterized.Parameters; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068/** 069 * Test all of the data block encoding algorithms for correctness. Most of the class generate data 070 * which will test different branches in code. 071 */ 072@Category({ IOTests.class, LargeTests.class }) 073@RunWith(Parameterized.class) 074public class TestDataBlockEncoders { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestDataBlockEncoders.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class); 081 082 private static int NUMBER_OF_KV = 10000; 083 private static int NUM_RANDOM_SEEKS = 1000; 084 085 private static int ENCODED_DATA_OFFSET = 086 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE; 087 static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 088 089 private final Configuration conf = HBaseConfiguration.create(); 090 private final RedundantKVGenerator generator = new RedundantKVGenerator(); 091 private final boolean includesMemstoreTS; 092 private final boolean includesTags; 093 private final boolean useOffheapData; 094 095 @Parameters 096 public static Collection<Object[]> parameters() { 097 return HBaseTestingUtility.memStoreTSTagsAndOffheapCombination(); 098 } 099 100 public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag, 101 boolean useOffheapData) { 102 this.includesMemstoreTS = includesMemstoreTS; 103 this.includesTags = includesTag; 104 this.useOffheapData = useOffheapData; 105 } 106 107 private HFileBlockEncodingContext getEncodingContext(Configuration conf, 108 Compression.Algorithm algo, DataBlockEncoding encoding) { 109 DataBlockEncoder encoder = encoding.getEncoder(); 110 HFileContext meta = 111 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 112 .withIncludesTags(includesTags).withCompression(algo).build(); 113 if (encoder != null) { 114 return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta); 115 } else { 116 return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta); 117 } 118 } 119 120 /** 121 * Test data block encoding of empty KeyValue. On test failure. 122 */ 123 @Test 124 public void testEmptyKeyValues() throws IOException { 125 List<KeyValue> kvList = new ArrayList<>(); 126 byte[] row = new byte[0]; 127 byte[] family = new byte[0]; 128 byte[] qualifier = new byte[0]; 129 byte[] value = new byte[0]; 130 if (!includesTags) { 131 kvList.add(new KeyValue(row, family, qualifier, 0L, value)); 132 kvList.add(new KeyValue(row, family, qualifier, 0L, value)); 133 } else { 134 byte[] metaValue1 = Bytes.toBytes("metaValue1"); 135 byte[] metaValue2 = Bytes.toBytes("metaValue2"); 136 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 137 new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) })); 138 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 139 new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) })); 140 } 141 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 142 } 143 144 /** 145 * Test KeyValues with negative timestamp. On test failure. 146 */ 147 @Test 148 public void testNegativeTimestamps() throws IOException { 149 List<KeyValue> kvList = new ArrayList<>(); 150 byte[] row = new byte[0]; 151 byte[] family = new byte[0]; 152 byte[] qualifier = new byte[0]; 153 byte[] value = new byte[0]; 154 if (includesTags) { 155 byte[] metaValue1 = Bytes.toBytes("metaValue1"); 156 byte[] metaValue2 = Bytes.toBytes("metaValue2"); 157 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 158 new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) })); 159 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 160 new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) })); 161 } else { 162 kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value)); 163 kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value)); 164 } 165 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 166 } 167 168 /** 169 * Test whether compression -> decompression gives the consistent results on pseudorandom sample. 170 * @throws IOException On test failure. 171 */ 172 @Test 173 public void testExecutionOnSample() throws IOException { 174 List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 175 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 176 } 177 178 /** 179 * Test seeking while file is encoded. 180 */ 181 @Test 182 public void testSeekingOnSample() throws IOException { 183 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 184 185 // create all seekers 186 List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>(); 187 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 188 LOG.info("Encoding: " + encoding); 189 DataBlockEncoder encoder = encoding.getEncoder(); 190 if (encoder == null) { 191 continue; 192 } 193 LOG.info("Encoder: " + encoder); 194 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 195 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 196 HFileContext meta = 197 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 198 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 199 DataBlockEncoder.EncodedSeeker seeker = 200 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 201 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 202 encodedSeekers.add(seeker); 203 } 204 LOG.info("Testing it!"); 205 // test it! 206 // try a few random seeks 207 Random rand = ThreadLocalRandom.current(); 208 for (boolean seekBefore : new boolean[] { false, true }) { 209 for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) { 210 int keyValueId; 211 if (!seekBefore) { 212 keyValueId = rand.nextInt(sampleKv.size()); 213 } else { 214 keyValueId = rand.nextInt(sampleKv.size() - 1) + 1; 215 } 216 217 KeyValue keyValue = sampleKv.get(keyValueId); 218 checkSeekingConsistency(encodedSeekers, seekBefore, keyValue); 219 } 220 } 221 222 // check edge cases 223 LOG.info("Checking edge cases"); 224 checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0)); 225 for (boolean seekBefore : new boolean[] { false, true }) { 226 checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1)); 227 KeyValue midKv = sampleKv.get(sampleKv.size() / 2); 228 Cell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv); 229 checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv); 230 } 231 LOG.info("Done"); 232 } 233 234 @Test 235 public void testSeekingToOffHeapKeyValueInSample() throws IOException { 236 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 237 238 // create all seekers 239 List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>(); 240 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 241 LOG.info("Encoding: " + encoding); 242 DataBlockEncoder encoder = encoding.getEncoder(); 243 if (encoder == null) { 244 continue; 245 } 246 LOG.info("Encoder: " + encoder); 247 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 248 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 249 HFileContext meta = 250 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 251 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 252 DataBlockEncoder.EncodedSeeker seeker = 253 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 254 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 255 encodedSeekers.add(seeker); 256 } 257 LOG.info("Testing it!"); 258 // test it! 259 // try a few random seeks 260 Random rand = ThreadLocalRandom.current(); 261 for (boolean seekBefore : new boolean[] { false, true }) { 262 for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) { 263 int keyValueId; 264 if (!seekBefore) { 265 keyValueId = rand.nextInt(sampleKv.size()); 266 } else { 267 keyValueId = rand.nextInt(sampleKv.size() - 1) + 1; 268 } 269 270 KeyValue keyValue = sampleKv.get(keyValueId); 271 checkSeekingConsistency(encodedSeekers, seekBefore, buildOffHeapKeyValue(keyValue)); 272 } 273 } 274 275 // check edge cases 276 LOG.info("Checking edge cases"); 277 checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0)); 278 for (boolean seekBefore : new boolean[] { false, true }) { 279 checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1)); 280 KeyValue midKv = sampleKv.get(sampleKv.size() / 2); 281 Cell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv); 282 checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv); 283 } 284 LOG.info("Done"); 285 } 286 287 static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs, 288 HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException { 289 DataBlockEncoder encoder = encoding.getEncoder(); 290 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 291 baos.write(HFILEBLOCK_DUMMY_HEADER); 292 DataOutputStream dos = new DataOutputStream(baos); 293 encoder.startBlockEncoding(encodingContext, dos); 294 for (KeyValue kv : kvs) { 295 encoder.encode(kv, encodingContext, dos); 296 } 297 encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); 298 byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; 299 System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); 300 if (useOffheapData) { 301 ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length); 302 bb.put(encodedData); 303 bb.rewind(); 304 return bb; 305 } 306 return ByteBuffer.wrap(encodedData); 307 } 308 309 @Test 310 public void testNextOnSample() throws IOException { 311 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 312 313 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 314 if (encoding.getEncoder() == null) { 315 continue; 316 } 317 DataBlockEncoder encoder = encoding.getEncoder(); 318 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 319 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 320 HFileContext meta = 321 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 322 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 323 DataBlockEncoder.EncodedSeeker seeker = 324 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 325 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 326 int i = 0; 327 do { 328 KeyValue expectedKeyValue = sampleKv.get(i); 329 Cell cell = seeker.getCell(); 330 if ( 331 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue, 332 cell) != 0 333 ) { 334 int commonPrefix = 335 PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true); 336 fail(String.format( 337 "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d" 338 + "\n expected %s\n actual %s", 339 encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(), 340 expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()), 341 CellUtil.toString(cell, false))); 342 } 343 i++; 344 } while (seeker.next()); 345 } 346 } 347 348 /** 349 * Test whether the decompression of first key is implemented correctly. 350 */ 351 @Test 352 public void testFirstKeyInBlockOnSample() throws IOException { 353 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 354 355 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 356 if (encoding.getEncoder() == null) { 357 continue; 358 } 359 DataBlockEncoder encoder = encoding.getEncoder(); 360 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 361 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 362 Cell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer)); 363 KeyValue firstKv = sampleKv.get(0); 364 if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) { 365 int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true); 366 fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix)); 367 } 368 } 369 } 370 371 @Test 372 public void testRowIndexWithTagsButNoTagsInCell() throws IOException { 373 List<KeyValue> kvList = new ArrayList<>(); 374 byte[] row = new byte[0]; 375 byte[] family = new byte[0]; 376 byte[] qualifier = new byte[0]; 377 byte[] value = new byte[0]; 378 KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value); 379 kvList.add(expectedKV); 380 DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1; 381 DataBlockEncoder encoder = encoding.getEncoder(); 382 ByteBuffer encodedBuffer = 383 encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false); 384 HFileContext meta = 385 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 386 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 387 DataBlockEncoder.EncodedSeeker seeker = 388 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 389 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 390 Cell cell = seeker.getCell(); 391 Assert.assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength()); 392 } 393 394 private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers, 395 boolean seekBefore, Cell keyValue) { 396 Cell expectedKeyValue = null; 397 ByteBuffer expectedKey = null; 398 ByteBuffer expectedValue = null; 399 for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) { 400 seeker.seekToKeyInBlock(keyValue, seekBefore); 401 seeker.rewind(); 402 403 Cell actualKeyValue = seeker.getCell(); 404 ByteBuffer actualKey = null; 405 actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey()); 406 ByteBuffer actualValue = seeker.getValueShallowCopy(); 407 408 if (expectedKeyValue != null) { 409 assertTrue(CellUtil.equals(expectedKeyValue, actualKeyValue)); 410 } else { 411 expectedKeyValue = actualKeyValue; 412 } 413 414 if (expectedKey != null) { 415 assertEquals(expectedKey, actualKey); 416 } else { 417 expectedKey = actualKey; 418 } 419 420 if (expectedValue != null) { 421 assertEquals(expectedValue, actualValue); 422 } else { 423 expectedValue = actualValue; 424 } 425 } 426 } 427 428 private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS, 429 boolean includesTags) throws IOException { 430 ByteBuffer unencodedDataBuf = 431 RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS); 432 HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS) 433 .withIncludesTags(includesTags).build(); 434 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 435 DataBlockEncoder encoder = encoding.getEncoder(); 436 if (encoder == null) { 437 continue; 438 } 439 HFileBlockEncodingContext encodingContext = 440 new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext); 441 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 442 baos.write(HFILEBLOCK_DUMMY_HEADER); 443 DataOutputStream dos = new DataOutputStream(baos); 444 encoder.startBlockEncoding(encodingContext, dos); 445 for (KeyValue kv : kvList) { 446 encoder.encode(kv, encodingContext, dos); 447 } 448 encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); 449 byte[] encodedData = baos.toByteArray(); 450 451 testAlgorithm(encodedData, unencodedDataBuf, encoder); 452 } 453 } 454 455 @Test 456 public void testZeroByte() throws IOException { 457 List<KeyValue> kvList = new ArrayList<>(); 458 byte[] row = Bytes.toBytes("abcd"); 459 byte[] family = new byte[] { 'f' }; 460 byte[] qualifier0 = new byte[] { 'b' }; 461 byte[] qualifier1 = new byte[] { 'c' }; 462 byte[] value0 = new byte[] { 'd' }; 463 byte[] value1 = new byte[] { 0x00 }; 464 if (includesTags) { 465 kvList.add(new KeyValue(row, family, qualifier0, 0, value0, 466 new Tag[] { new ArrayBackedTag((byte) 1, "value1") })); 467 kvList.add(new KeyValue(row, family, qualifier1, 0, value1, 468 new Tag[] { new ArrayBackedTag((byte) 1, "value1") })); 469 } else { 470 kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0)); 471 kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1)); 472 } 473 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 474 } 475 476 private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf, 477 DataBlockEncoder encoder) throws IOException { 478 // decode 479 ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET, 480 encodedData.length - ENCODED_DATA_OFFSET); 481 DataInputStream dis = new DataInputStream(bais); 482 ByteBuffer actualDataset; 483 HFileContext meta = 484 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 485 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 486 actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta)); 487 actualDataset.rewind(); 488 489 // this is because in case of prefix tree the decoded stream will not have 490 // the 491 // mvcc in it. 492 assertEquals("Encoding -> decoding gives different results for " + encoder, 493 Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset)); 494 } 495 496 private static ByteBufferKeyValue buildOffHeapKeyValue(KeyValue keyValue) throws IOException { 497 ByteArrayOutputStream out = new ByteArrayOutputStream(); 498 keyValue.write(out, false); 499 byte[] bytes = out.toByteArray(); 500 ByteBuffer bb = ByteBuffer.allocateDirect(bytes.length); 501 bb.put(bytes); 502 bb.flip(); 503 504 return new ByteBufferKeyValue(bb, 0, bytes.length); 505 } 506}