001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.encoding; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.ByteArrayInputStream; 025import java.io.DataInputStream; 026import java.io.DataOutputStream; 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.List; 032import java.util.Random; 033import java.util.concurrent.ThreadLocalRandom; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.ArrayBackedTag; 036import org.apache.hadoop.hbase.ByteBufferKeyValue; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.CellComparatorImpl; 039import org.apache.hadoop.hbase.CellUtil; 040import org.apache.hadoop.hbase.ExtendedCell; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.KeyValue; 046import org.apache.hadoop.hbase.KeyValue.Type; 047import org.apache.hadoop.hbase.PrivateCellUtil; 048import org.apache.hadoop.hbase.Tag; 049import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 050import org.apache.hadoop.hbase.io.compress.Compression; 051import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 052import org.apache.hadoop.hbase.io.hfile.HFileContext; 053import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 054import org.apache.hadoop.hbase.nio.SingleByteBuff; 055import org.apache.hadoop.hbase.testclassification.IOTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.RedundantKVGenerator; 059import org.junit.Assert; 060import org.junit.ClassRule; 061import org.junit.Test; 062import org.junit.experimental.categories.Category; 063import org.junit.runner.RunWith; 064import org.junit.runners.Parameterized; 065import org.junit.runners.Parameterized.Parameters; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069/** 070 * Test all of the data block encoding algorithms for correctness. Most of the class generate data 071 * which will test different branches in code. 072 */ 073@Category({ IOTests.class, LargeTests.class }) 074@RunWith(Parameterized.class) 075public class TestDataBlockEncoders { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestDataBlockEncoders.class); 080 081 private static final Logger LOG = LoggerFactory.getLogger(TestDataBlockEncoders.class); 082 083 private static int NUMBER_OF_KV = 10000; 084 private static int NUM_RANDOM_SEEKS = 1000; 085 086 private static int ENCODED_DATA_OFFSET = 087 HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE; 088 static final byte[] HFILEBLOCK_DUMMY_HEADER = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; 089 090 private final Configuration conf = HBaseConfiguration.create(); 091 private final RedundantKVGenerator generator = new RedundantKVGenerator(); 092 private final boolean includesMemstoreTS; 093 private final boolean includesTags; 094 private final boolean useOffheapData; 095 096 @Parameters 097 public static Collection<Object[]> parameters() { 098 return HBaseTestingUtil.memStoreTSTagsAndOffheapCombination(); 099 } 100 101 public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag, 102 boolean useOffheapData) { 103 this.includesMemstoreTS = includesMemstoreTS; 104 this.includesTags = includesTag; 105 this.useOffheapData = useOffheapData; 106 } 107 108 private HFileBlockEncodingContext getEncodingContext(Configuration conf, 109 Compression.Algorithm algo, DataBlockEncoding encoding) { 110 DataBlockEncoder encoder = encoding.getEncoder(); 111 HFileContext meta = 112 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 113 .withIncludesTags(includesTags).withCompression(algo).build(); 114 if (encoder != null) { 115 return encoder.newDataBlockEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta); 116 } else { 117 return new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, meta); 118 } 119 } 120 121 /** 122 * Test data block encoding of empty KeyValue. On test failure. 123 */ 124 @Test 125 public void testEmptyKeyValues() throws IOException { 126 List<KeyValue> kvList = new ArrayList<>(); 127 byte[] row = new byte[0]; 128 byte[] family = new byte[0]; 129 byte[] qualifier = new byte[0]; 130 byte[] value = new byte[0]; 131 if (!includesTags) { 132 kvList.add(new KeyValue(row, family, qualifier, 0L, value)); 133 kvList.add(new KeyValue(row, family, qualifier, 0L, value)); 134 } else { 135 byte[] metaValue1 = Bytes.toBytes("metaValue1"); 136 byte[] metaValue2 = Bytes.toBytes("metaValue2"); 137 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 138 new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) })); 139 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 140 new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) })); 141 } 142 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 143 } 144 145 /** 146 * Test KeyValues with negative timestamp. On test failure. 147 */ 148 @Test 149 public void testNegativeTimestamps() throws IOException { 150 List<KeyValue> kvList = new ArrayList<>(); 151 byte[] row = new byte[0]; 152 byte[] family = new byte[0]; 153 byte[] qualifier = new byte[0]; 154 byte[] value = new byte[0]; 155 if (includesTags) { 156 byte[] metaValue1 = Bytes.toBytes("metaValue1"); 157 byte[] metaValue2 = Bytes.toBytes("metaValue2"); 158 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 159 new Tag[] { new ArrayBackedTag((byte) 1, metaValue1) })); 160 kvList.add(new KeyValue(row, family, qualifier, 0L, value, 161 new Tag[] { new ArrayBackedTag((byte) 1, metaValue2) })); 162 } else { 163 kvList.add(new KeyValue(row, family, qualifier, -1L, Type.Put, value)); 164 kvList.add(new KeyValue(row, family, qualifier, -2L, Type.Put, value)); 165 } 166 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 167 } 168 169 /** 170 * Test whether compression -> decompression gives the consistent results on pseudorandom sample. 171 * @throws IOException On test failure. 172 */ 173 @Test 174 public void testExecutionOnSample() throws IOException { 175 List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 176 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 177 } 178 179 /** 180 * Test seeking while file is encoded. 181 */ 182 @Test 183 public void testSeekingOnSample() throws IOException { 184 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 185 186 // create all seekers 187 List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>(); 188 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 189 LOG.info("Encoding: " + encoding); 190 DataBlockEncoder encoder = encoding.getEncoder(); 191 if (encoder == null) { 192 continue; 193 } 194 LOG.info("Encoder: " + encoder); 195 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 196 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 197 HFileContext meta = 198 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 199 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 200 DataBlockEncoder.EncodedSeeker seeker = 201 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 202 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 203 encodedSeekers.add(seeker); 204 } 205 LOG.info("Testing it!"); 206 // test it! 207 // try a few random seeks 208 Random rand = ThreadLocalRandom.current(); 209 for (boolean seekBefore : new boolean[] { false, true }) { 210 for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) { 211 int keyValueId; 212 if (!seekBefore) { 213 keyValueId = rand.nextInt(sampleKv.size()); 214 } else { 215 keyValueId = rand.nextInt(sampleKv.size() - 1) + 1; 216 } 217 218 KeyValue keyValue = sampleKv.get(keyValueId); 219 checkSeekingConsistency(encodedSeekers, seekBefore, keyValue); 220 } 221 } 222 223 // check edge cases 224 LOG.info("Checking edge cases"); 225 checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0)); 226 for (boolean seekBefore : new boolean[] { false, true }) { 227 checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1)); 228 KeyValue midKv = sampleKv.get(sampleKv.size() / 2); 229 ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv); 230 checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv); 231 } 232 LOG.info("Done"); 233 } 234 235 @Test 236 public void testSeekingToOffHeapKeyValueInSample() throws IOException { 237 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 238 239 // create all seekers 240 List<DataBlockEncoder.EncodedSeeker> encodedSeekers = new ArrayList<>(); 241 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 242 LOG.info("Encoding: " + encoding); 243 DataBlockEncoder encoder = encoding.getEncoder(); 244 if (encoder == null) { 245 continue; 246 } 247 LOG.info("Encoder: " + encoder); 248 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 249 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 250 HFileContext meta = 251 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 252 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 253 DataBlockEncoder.EncodedSeeker seeker = 254 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 255 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 256 encodedSeekers.add(seeker); 257 } 258 LOG.info("Testing it!"); 259 // test it! 260 // try a few random seeks 261 Random rand = ThreadLocalRandom.current(); 262 for (boolean seekBefore : new boolean[] { false, true }) { 263 for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) { 264 int keyValueId; 265 if (!seekBefore) { 266 keyValueId = rand.nextInt(sampleKv.size()); 267 } else { 268 keyValueId = rand.nextInt(sampleKv.size() - 1) + 1; 269 } 270 271 KeyValue keyValue = sampleKv.get(keyValueId); 272 checkSeekingConsistency(encodedSeekers, seekBefore, buildOffHeapKeyValue(keyValue)); 273 } 274 } 275 276 // check edge cases 277 LOG.info("Checking edge cases"); 278 checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0)); 279 for (boolean seekBefore : new boolean[] { false, true }) { 280 checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1)); 281 KeyValue midKv = sampleKv.get(sampleKv.size() / 2); 282 ExtendedCell lastMidKv = PrivateCellUtil.createLastOnRowCol(midKv); 283 checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv); 284 } 285 LOG.info("Done"); 286 } 287 288 static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs, 289 HFileBlockEncodingContext encodingContext, boolean useOffheapData) throws IOException { 290 DataBlockEncoder encoder = encoding.getEncoder(); 291 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 292 baos.write(HFILEBLOCK_DUMMY_HEADER); 293 DataOutputStream dos = new DataOutputStream(baos); 294 encoder.startBlockEncoding(encodingContext, dos); 295 for (KeyValue kv : kvs) { 296 encoder.encode(kv, encodingContext, dos); 297 } 298 encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); 299 byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET]; 300 System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length); 301 if (useOffheapData) { 302 ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length); 303 bb.put(encodedData); 304 bb.rewind(); 305 return bb; 306 } 307 return ByteBuffer.wrap(encodedData); 308 } 309 310 @Test 311 public void testNextOnSample() throws IOException { 312 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 313 314 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 315 if (encoding.getEncoder() == null) { 316 continue; 317 } 318 DataBlockEncoder encoder = encoding.getEncoder(); 319 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 320 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 321 HFileContext meta = 322 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 323 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 324 DataBlockEncoder.EncodedSeeker seeker = 325 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 326 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 327 int i = 0; 328 do { 329 KeyValue expectedKeyValue = sampleKv.get(i); 330 ExtendedCell cell = seeker.getCell(); 331 if ( 332 PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expectedKeyValue, 333 cell) != 0 334 ) { 335 int commonPrefix = 336 PrivateCellUtil.findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true); 337 fail(String.format( 338 "next() produces wrong results " + "encoder: %s i: %d commonPrefix: %d" 339 + "\n expected %s\n actual %s", 340 encoder.toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(), 341 expectedKeyValue.getKeyOffset(), expectedKeyValue.getKeyLength()), 342 CellUtil.toString(cell, false))); 343 } 344 i++; 345 } while (seeker.next()); 346 } 347 } 348 349 /** 350 * Test whether the decompression of first key is implemented correctly. 351 */ 352 @Test 353 public void testFirstKeyInBlockOnSample() throws IOException { 354 List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags); 355 356 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 357 if (encoding.getEncoder() == null) { 358 continue; 359 } 360 DataBlockEncoder encoder = encoding.getEncoder(); 361 ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, 362 getEncodingContext(conf, Compression.Algorithm.NONE, encoding), this.useOffheapData); 363 ExtendedCell key = encoder.getFirstKeyCellInBlock(new SingleByteBuff(encodedBuffer)); 364 KeyValue firstKv = sampleKv.get(0); 365 if (0 != PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, key, firstKv)) { 366 int commonPrefix = PrivateCellUtil.findCommonPrefixInFlatKey(key, firstKv, false, true); 367 fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix)); 368 } 369 } 370 } 371 372 @Test 373 public void testRowIndexWithTagsButNoTagsInCell() throws IOException { 374 List<KeyValue> kvList = new ArrayList<>(); 375 byte[] row = new byte[0]; 376 byte[] family = new byte[0]; 377 byte[] qualifier = new byte[0]; 378 byte[] value = new byte[0]; 379 KeyValue expectedKV = new KeyValue(row, family, qualifier, 1L, Type.Put, value); 380 kvList.add(expectedKV); 381 DataBlockEncoding encoding = DataBlockEncoding.ROW_INDEX_V1; 382 DataBlockEncoder encoder = encoding.getEncoder(); 383 ByteBuffer encodedBuffer = 384 encodeKeyValues(encoding, kvList, getEncodingContext(conf, Algorithm.NONE, encoding), false); 385 HFileContext meta = 386 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 387 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 388 DataBlockEncoder.EncodedSeeker seeker = 389 encoder.createSeeker(encoder.newDataBlockDecodingContext(conf, meta)); 390 seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer)); 391 Cell cell = seeker.getCell(); 392 Assert.assertEquals(expectedKV.getLength(), ((KeyValue) cell).getLength()); 393 } 394 395 private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers, 396 boolean seekBefore, ExtendedCell keyValue) { 397 ExtendedCell expectedKeyValue = null; 398 ByteBuffer expectedKey = null; 399 ByteBuffer expectedValue = null; 400 for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) { 401 seeker.seekToKeyInBlock(keyValue, seekBefore); 402 seeker.rewind(); 403 404 ExtendedCell actualKeyValue = seeker.getCell(); 405 ByteBuffer actualKey = null; 406 actualKey = ByteBuffer.wrap(((KeyValue) seeker.getKey()).getKey()); 407 ByteBuffer actualValue = seeker.getValueShallowCopy(); 408 409 if (expectedKeyValue != null) { 410 assertTrue(PrivateCellUtil.equals(expectedKeyValue, actualKeyValue)); 411 } else { 412 expectedKeyValue = actualKeyValue; 413 } 414 415 if (expectedKey != null) { 416 assertEquals(expectedKey, actualKey); 417 } else { 418 expectedKey = actualKey; 419 } 420 421 if (expectedValue != null) { 422 assertEquals(expectedValue, actualValue); 423 } else { 424 expectedValue = actualValue; 425 } 426 } 427 } 428 429 private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS, 430 boolean includesTags) throws IOException { 431 ByteBuffer unencodedDataBuf = 432 RedundantKVGenerator.convertKvToByteBuffer(kvList, includesMemstoreTS); 433 HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS) 434 .withIncludesTags(includesTags).build(); 435 for (DataBlockEncoding encoding : DataBlockEncoding.values()) { 436 DataBlockEncoder encoder = encoding.getEncoder(); 437 if (encoder == null) { 438 continue; 439 } 440 HFileBlockEncodingContext encodingContext = 441 new HFileBlockDefaultEncodingContext(conf, encoding, HFILEBLOCK_DUMMY_HEADER, fileContext); 442 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 443 baos.write(HFILEBLOCK_DUMMY_HEADER); 444 DataOutputStream dos = new DataOutputStream(baos); 445 encoder.startBlockEncoding(encodingContext, dos); 446 for (KeyValue kv : kvList) { 447 encoder.encode(kv, encodingContext, dos); 448 } 449 encoder.endBlockEncoding(encodingContext, dos, baos.getBuffer()); 450 byte[] encodedData = baos.toByteArray(); 451 452 testAlgorithm(encodedData, unencodedDataBuf, encoder); 453 } 454 } 455 456 @Test 457 public void testZeroByte() throws IOException { 458 List<KeyValue> kvList = new ArrayList<>(); 459 byte[] row = Bytes.toBytes("abcd"); 460 byte[] family = new byte[] { 'f' }; 461 byte[] qualifier0 = new byte[] { 'b' }; 462 byte[] qualifier1 = new byte[] { 'c' }; 463 byte[] value0 = new byte[] { 'd' }; 464 byte[] value1 = new byte[] { 0x00 }; 465 if (includesTags) { 466 kvList.add(new KeyValue(row, family, qualifier0, 0, value0, 467 new Tag[] { new ArrayBackedTag((byte) 1, "value1") })); 468 kvList.add(new KeyValue(row, family, qualifier1, 0, value1, 469 new Tag[] { new ArrayBackedTag((byte) 1, "value1") })); 470 } else { 471 kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0)); 472 kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1)); 473 } 474 testEncodersOnDataset(kvList, includesMemstoreTS, includesTags); 475 } 476 477 private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf, 478 DataBlockEncoder encoder) throws IOException { 479 // decode 480 ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET, 481 encodedData.length - ENCODED_DATA_OFFSET); 482 DataInputStream dis = new DataInputStream(bais); 483 ByteBuffer actualDataset; 484 HFileContext meta = 485 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 486 .withIncludesTags(includesTags).withCompression(Compression.Algorithm.NONE).build(); 487 actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(conf, meta)); 488 actualDataset.rewind(); 489 490 // this is because in case of prefix tree the decoded stream will not have 491 // the 492 // mvcc in it. 493 assertEquals("Encoding -> decoding gives different results for " + encoder, 494 Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset)); 495 } 496 497 private static ByteBufferKeyValue buildOffHeapKeyValue(KeyValue keyValue) throws IOException { 498 ByteArrayOutputStream out = new ByteArrayOutputStream(); 499 keyValue.write(out, false); 500 byte[] bytes = out.toByteArray(); 501 ByteBuffer bb = ByteBuffer.allocateDirect(bytes.length); 502 bb.put(bytes); 503 bb.flip(); 504 505 return new ByteBufferKeyValue(bb, 0, bytes.length); 506 } 507}