001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; 021 022import java.io.DataOutput; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.permission.FsPermission; 034import org.apache.hadoop.hbase.ByteBufferExtendedCell; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.KeyValueUtil; 041import org.apache.hadoop.hbase.MetaCellComparator; 042import org.apache.hadoop.hbase.PrivateCellUtil; 043import org.apache.hadoop.hbase.io.compress.Compression; 044import org.apache.hadoop.hbase.io.crypto.Encryption; 045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 046import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 047import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 048import org.apache.hadoop.hbase.security.EncryptionUtil; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.BloomFilterWriter; 051import org.apache.hadoop.hbase.util.ByteBufferUtils; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.FSUtils; 056import org.apache.hadoop.io.Writable; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Common functionality needed by all versions of {@link HFile} writers. 063 */ 064@InterfaceAudience.Private 065public class HFileWriterImpl implements HFile.Writer { 066 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 067 068 private static final long UNSET = -1; 069 070 /** if this feature is enabled, preCalculate encoded data size before real encoding happens */ 071 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 072 "hbase.writer.unified.encoded.blocksize.ratio"; 073 074 /** Block size limit after encoding, used to unify encoded block Cache entry size */ 075 private final int encodedBlockSizeLimit; 076 077 /** The Cell previously appended. Becomes the last cell in the file. */ 078 protected Cell lastCell = null; 079 080 /** FileSystem stream to write into. */ 081 protected FSDataOutputStream outputStream; 082 083 /** True if we opened the <code>outputStream</code> (and so will close it). */ 084 protected final boolean closeOutputStream; 085 086 /** A "file info" block: a key-value map of file-wide metadata. */ 087 protected HFileInfo fileInfo = new HFileInfo(); 088 089 /** Total # of key/value entries, i.e. how many times add() was called. */ 090 protected long entryCount = 0; 091 092 /** Used for calculating the average key length. */ 093 protected long totalKeyLength = 0; 094 095 /** Used for calculating the average value length. */ 096 protected long totalValueLength = 0; 097 098 /** Len of the biggest cell. */ 099 protected long lenOfBiggestCell = 0; 100 /** Key of the biggest cell. */ 101 protected byte[] keyOfBiggestCell; 102 103 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 104 protected long totalUncompressedBytes = 0; 105 106 /** Meta block names. */ 107 protected List<byte[]> metaNames = new ArrayList<>(); 108 109 /** {@link Writable}s representing meta block data. */ 110 protected List<Writable> metaData = new ArrayList<>(); 111 112 /** 113 * First cell in a block. This reference should be short-lived since we write hfiles in a burst. 114 */ 115 protected Cell firstCellInBlock = null; 116 117 /** May be null if we were passed a stream. */ 118 protected final Path path; 119 120 /** Cache configuration for caching data on write. */ 121 protected final CacheConfig cacheConf; 122 123 /** 124 * Name for this object used when logging or in toString. Is either the result of a toString on 125 * stream or else name of passed file Path. 126 */ 127 protected final String name; 128 129 /** 130 * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is 131 * no encoding. 132 */ 133 protected final HFileDataBlockEncoder blockEncoder; 134 135 protected final HFileIndexBlockEncoder indexBlockEncoder; 136 137 protected final HFileContext hFileContext; 138 139 private int maxTagsLength = 0; 140 141 /** KeyValue version in FileInfo */ 142 public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 143 144 /** Version for KeyValue which includes memstore timestamp */ 145 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 146 147 /** Inline block writers for multi-level block index and compound Blooms. */ 148 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 149 150 /** block writer */ 151 protected HFileBlock.Writer blockWriter; 152 153 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 154 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 155 156 /** The offset of the first data block or -1 if the file is empty. */ 157 private long firstDataBlockOffset = UNSET; 158 159 /** The offset of the last data block or 0 if the file is empty. */ 160 protected long lastDataBlockOffset = UNSET; 161 162 /** 163 * The last(stop) Cell of the previous data block. This reference should be short-lived since we 164 * write hfiles in a burst. 165 */ 166 private Cell lastCellOfPreviousBlock = null; 167 168 /** Additional data items to be written to the "load-on-open" section. */ 169 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 170 171 protected long maxMemstoreTS = 0; 172 173 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 174 FSDataOutputStream outputStream, HFileContext fileContext) { 175 this.outputStream = outputStream; 176 this.path = path; 177 this.name = path != null ? path.getName() : outputStream.toString(); 178 this.hFileContext = fileContext; 179 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 180 if (encoding != DataBlockEncoding.NONE) { 181 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 182 } else { 183 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 184 } 185 IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding(); 186 if (indexBlockEncoding != IndexBlockEncoding.NONE) { 187 this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding); 188 } else { 189 this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE; 190 } 191 closeOutputStream = path != null; 192 this.cacheConf = cacheConf; 193 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); 194 this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); 195 196 finishInit(conf); 197 if (LOG.isTraceEnabled()) { 198 LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: " 199 + cacheConf + " fileContext: " + fileContext); 200 } 201 } 202 203 /** 204 * Add to the file info. All added key/value pairs can be obtained using 205 * {@link HFile.Reader#getHFileInfo()}. 206 * @param k Key 207 * @param v Value 208 * @throws IOException in case the key or the value are invalid 209 */ 210 @Override 211 public void appendFileInfo(final byte[] k, final byte[] v) throws IOException { 212 fileInfo.append(k, v, true); 213 } 214 215 /** 216 * Sets the file info offset in the trailer, finishes up populating fields in the file info, and 217 * writes the file info into the given data output. The reason the data output is not always 218 * {@link #outputStream} is that we store file info as a block in version 2. 219 * @param trailer fixed file trailer 220 * @param out the data output to write the file info to 221 */ 222 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 223 throws IOException { 224 trailer.setFileInfoOffset(outputStream.getPos()); 225 finishFileInfo(); 226 long startTime = EnvironmentEdgeManager.currentTime(); 227 fileInfo.write(out); 228 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 229 } 230 231 public long getPos() throws IOException { 232 return outputStream.getPos(); 233 234 } 235 236 /** 237 * Checks that the given Cell's key does not violate the key order. 238 * @param cell Cell whose key to check. 239 * @return true if the key is duplicate 240 * @throws IOException if the key or the key order is wrong 241 */ 242 protected boolean checkKey(final Cell cell) throws IOException { 243 boolean isDuplicateKey = false; 244 245 if (cell == null) { 246 throw new IOException("Key cannot be null or empty"); 247 } 248 if (lastCell != null) { 249 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 250 lastCell, cell); 251 if (keyComp > 0) { 252 String message = getLexicalErrorMessage(cell); 253 throw new IOException(message); 254 } else if (keyComp == 0) { 255 isDuplicateKey = true; 256 } 257 } 258 return isDuplicateKey; 259 } 260 261 private String getLexicalErrorMessage(Cell cell) { 262 StringBuilder sb = new StringBuilder(); 263 sb.append("Added a key not lexically larger than previous. Current cell = "); 264 sb.append(cell); 265 sb.append(", lastCell = "); 266 sb.append(lastCell); 267 // file context includes HFile path and optionally table and CF of file being written 268 sb.append("fileContext="); 269 sb.append(hFileContext); 270 return sb.toString(); 271 } 272 273 /** Checks the given value for validity. */ 274 protected void checkValue(final byte[] value, final int offset, final int length) 275 throws IOException { 276 if (value == null) { 277 throw new IOException("Value cannot be null"); 278 } 279 } 280 281 /** Returns Path or null if we were passed a stream rather than a Path. */ 282 @Override 283 public Path getPath() { 284 return path; 285 } 286 287 @Override 288 public String toString() { 289 return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression=" 290 + hFileContext.getCompression().getName(); 291 } 292 293 public static Compression.Algorithm compressionByName(String algoName) { 294 if (algoName == null) { 295 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 296 } 297 return Compression.getCompressionAlgorithmByName(algoName); 298 } 299 300 /** A helper method to create HFile output streams in constructors */ 301 protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs, 302 Path path, InetSocketAddress[] favoredNodes) throws IOException { 303 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 304 return FSUtils.create(conf, fs, path, perms, favoredNodes); 305 } 306 307 /** Additional initialization steps */ 308 protected void finishInit(final Configuration conf) { 309 if (blockWriter != null) { 310 throw new IllegalStateException("finishInit called twice"); 311 } 312 blockWriter = 313 new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), 314 conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); 315 // Data block index writer 316 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 317 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 318 cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); 319 dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); 320 dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); 321 inlineBlockWriters.add(dataBlockIndexWriter); 322 323 // Meta data block index writer 324 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 325 LOG.trace("Initialized with {}", cacheConf); 326 } 327 328 /** 329 * At a block boundary, write all the inline blocks and opens new block. 330 */ 331 protected void checkBlockBoundary() throws IOException { 332 boolean shouldFinishBlock = false; 333 // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0 334 // and we should use the encoding ratio 335 if (encodedBlockSizeLimit > 0) { 336 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit; 337 } else { 338 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() 339 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); 340 } 341 shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); 342 if (shouldFinishBlock) { 343 finishBlock(); 344 writeInlineBlocks(false); 345 newBlock(); 346 } 347 } 348 349 /** Clean up the data block that is currently being written. */ 350 private void finishBlock() throws IOException { 351 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 352 return; 353 } 354 355 // Update the first data block offset if UNSET; used scanning. 356 if (firstDataBlockOffset == UNSET) { 357 firstDataBlockOffset = outputStream.getPos(); 358 } 359 // Update the last data block offset each time through here. 360 lastDataBlockOffset = outputStream.getPos(); 361 blockWriter.writeHeaderAndData(outputStream); 362 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 363 Cell indexEntry = 364 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 365 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 366 lastDataBlockOffset, onDiskSize); 367 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 368 if (cacheConf.shouldCacheDataOnWrite()) { 369 doCacheOnWrite(lastDataBlockOffset); 370 } 371 } 372 373 /** 374 * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is 375 * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an 376 * optimization. It does not always work. In this case we'll just return the <code>right</code> 377 * cell. 378 * @return A cell that sorts between <code>left</code> and <code>right</code>. 379 */ 380 public static Cell getMidpoint(final CellComparator comparator, final Cell left, 381 final Cell right) { 382 if (right == null) { 383 throw new IllegalArgumentException("right cell can not be null"); 384 } 385 if (left == null) { 386 return right; 387 } 388 // If Cells from meta table, don't mess around. meta table Cells have schema 389 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 390 // out without trying to do this optimization. 391 if (comparator instanceof MetaCellComparator) { 392 return right; 393 } 394 byte[] midRow; 395 boolean bufferBacked = 396 left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell; 397 if (bufferBacked) { 398 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 399 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 400 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 401 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 402 } else { 403 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(), 404 right.getRowArray(), right.getRowOffset(), right.getRowLength()); 405 } 406 if (midRow != null) { 407 return PrivateCellUtil.createFirstOnRow(midRow); 408 } 409 // Rows are same. Compare on families. 410 if (bufferBacked) { 411 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 412 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 413 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 414 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 415 } else { 416 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 417 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 418 right.getFamilyLength()); 419 } 420 if (midRow != null) { 421 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 422 } 423 // Families are same. Compare on qualifiers. 424 if (bufferBacked) { 425 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 426 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 427 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 428 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 429 } else { 430 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 431 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 432 right.getQualifierLength()); 433 } 434 if (midRow != null) { 435 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 436 } 437 // No opportunity for optimization. Just return right key. 438 return right; 439 } 440 441 /** 442 * Try to get a byte array that falls between left and right as short as possible with 443 * lexicographical order; 444 * @return Return a new array that is between left and right and minimally sized else just return 445 * null if left == right. 446 */ 447 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 448 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 449 int minLength = leftLength < rightLength ? leftLength : rightLength; 450 int diffIdx = 0; 451 for (; diffIdx < minLength; diffIdx++) { 452 byte leftByte = leftArray[leftOffset + diffIdx]; 453 byte rightByte = rightArray[rightOffset + diffIdx]; 454 if ((leftByte & 0xff) > (rightByte & 0xff)) { 455 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 456 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 457 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 458 } else if (leftByte != rightByte) { 459 break; 460 } 461 } 462 if (diffIdx == minLength) { 463 if (leftLength > rightLength) { 464 // right is prefix of left 465 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 466 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 467 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 468 } else if (leftLength < rightLength) { 469 // left is prefix of right. 470 byte[] minimumMidpointArray = new byte[minLength + 1]; 471 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1); 472 minimumMidpointArray[minLength] = 0x00; 473 return minimumMidpointArray; 474 } else { 475 // left == right 476 return null; 477 } 478 } 479 // Note that left[diffIdx] can never be equal to 0xff since left < right 480 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 481 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1); 482 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 483 return minimumMidpointArray; 484 } 485 486 /** 487 * Try to create a new byte array that falls between left and right as short as possible with 488 * lexicographical order. 489 * @return Return a new array that is between left and right and minimally sized else just return 490 * null if left == right. 491 */ 492 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 493 ByteBuffer right, int rightOffset, int rightLength) { 494 int minLength = leftLength < rightLength ? leftLength : rightLength; 495 int diffIdx = 0; 496 for (; diffIdx < minLength; diffIdx++) { 497 int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 498 int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx); 499 if ((leftByte & 0xff) > (rightByte & 0xff)) { 500 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 501 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 502 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 503 } else if (leftByte != rightByte) { 504 break; 505 } 506 } 507 if (diffIdx == minLength) { 508 if (leftLength > rightLength) { 509 // right is prefix of left 510 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 511 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 512 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 513 } else if (leftLength < rightLength) { 514 // left is prefix of right. 515 byte[] minimumMidpointArray = new byte[minLength + 1]; 516 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0, 517 minLength + 1); 518 minimumMidpointArray[minLength] = 0x00; 519 return minimumMidpointArray; 520 } else { 521 // left == right 522 return null; 523 } 524 } 525 // Note that left[diffIdx] can never be equal to 0xff since left < right 526 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 527 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1); 528 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 529 return minimumMidpointArray; 530 } 531 532 /** Gives inline block writers an opportunity to contribute blocks. */ 533 private void writeInlineBlocks(boolean closing) throws IOException { 534 for (InlineBlockWriter ibw : inlineBlockWriters) { 535 while (ibw.shouldWriteBlock(closing)) { 536 long offset = outputStream.getPos(); 537 boolean cacheThisBlock = ibw.getCacheOnWrite(); 538 ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType())); 539 blockWriter.writeHeaderAndData(outputStream); 540 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 541 blockWriter.getUncompressedSizeWithoutHeader()); 542 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 543 544 if (cacheThisBlock) { 545 doCacheOnWrite(offset); 546 } 547 } 548 } 549 } 550 551 /** 552 * Caches the last written HFile block. 553 * @param offset the offset of the block we want to cache. Used to determine the cache key. 554 */ 555 private void doCacheOnWrite(long offset) { 556 cacheConf.getBlockCache().ifPresent(cache -> { 557 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 558 try { 559 cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()), 560 cacheFormatBlock, cacheConf.isInMemory(), true); 561 } finally { 562 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 563 cacheFormatBlock.release(); 564 } 565 }); 566 } 567 568 /** 569 * Ready a new block for writing. 570 */ 571 protected void newBlock() throws IOException { 572 // This is where the next block begins. 573 blockWriter.startWriting(BlockType.DATA); 574 firstCellInBlock = null; 575 if (lastCell != null) { 576 lastCellOfPreviousBlock = lastCell; 577 } 578 } 579 580 /** 581 * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive. 582 * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance. 583 * If metadata is small, consider adding to file info using 584 * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data 585 * later (DO NOT REUSE) 586 */ 587 @Override 588 public void appendMetaBlock(String metaBlockName, Writable content) { 589 byte[] key = Bytes.toBytes(metaBlockName); 590 int i; 591 for (i = 0; i < metaNames.size(); ++i) { 592 // stop when the current key is greater than our own 593 byte[] cur = metaNames.get(i); 594 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { 595 break; 596 } 597 } 598 metaNames.add(i, key); 599 metaData.add(i, content); 600 } 601 602 @Override 603 public void close() throws IOException { 604 if (outputStream == null) { 605 return; 606 } 607 // Save data block encoder metadata in the file info. 608 blockEncoder.saveMetadata(this); 609 // Save index block encoder metadata in the file info. 610 indexBlockEncoder.saveMetadata(this); 611 // Write out the end of the data blocks, then write meta data blocks. 612 // followed by fileinfo, data block index and meta block index. 613 614 finishBlock(); 615 writeInlineBlocks(true); 616 617 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 618 619 // Write out the metadata blocks if any. 620 if (!metaNames.isEmpty()) { 621 for (int i = 0; i < metaNames.size(); ++i) { 622 // store the beginning offset 623 long offset = outputStream.getPos(); 624 // write the metadata content 625 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 626 metaData.get(i).write(dos); 627 628 blockWriter.writeHeaderAndData(outputStream); 629 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 630 631 // Add the new meta block to the meta index. 632 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 633 blockWriter.getOnDiskSizeWithHeader()); 634 } 635 } 636 637 // Load-on-open section. 638 639 // Data block index. 640 // 641 // In version 2, this section of the file starts with the root level data 642 // block index. We call a function that writes intermediate-level blocks 643 // first, then root level, and returns the offset of the root level block 644 // index. 645 646 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 647 trailer.setLoadOnOpenOffset(rootIndexOffset); 648 649 // Meta block index. 650 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX), 651 "meta"); 652 blockWriter.writeHeaderAndData(outputStream); 653 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 654 655 if (this.hFileContext.isIncludesMvcc()) { 656 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 657 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 658 } 659 660 // File info 661 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 662 blockWriter.writeHeaderAndData(outputStream); 663 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 664 665 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 666 for (BlockWritable w : additionalLoadOnOpenData) { 667 blockWriter.writeBlock(w, outputStream); 668 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 669 } 670 671 // Now finish off the trailer. 672 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 673 trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize()); 674 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 675 trailer.setLastDataBlockOffset(lastDataBlockOffset); 676 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 677 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 678 679 finishClose(trailer); 680 681 blockWriter.release(); 682 } 683 684 @Override 685 public void addInlineBlockWriter(InlineBlockWriter ibw) { 686 inlineBlockWriters.add(ibw); 687 } 688 689 @Override 690 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 691 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 692 } 693 694 @Override 695 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 696 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 697 } 698 699 private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) { 700 if (bfw.getKeyCount() <= 0) { 701 return; 702 } 703 704 if ( 705 blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META 706 ) { 707 throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported"); 708 } 709 additionalLoadOnOpenData.add(new BlockWritable() { 710 @Override 711 public BlockType getBlockType() { 712 return blockType; 713 } 714 715 @Override 716 public void writeToBlock(DataOutput out) throws IOException { 717 bfw.getMetaWriter().write(out); 718 Writable dataWriter = bfw.getDataWriter(); 719 if (dataWriter != null) { 720 dataWriter.write(out); 721 } 722 } 723 }); 724 } 725 726 @Override 727 public HFileContext getFileContext() { 728 return hFileContext; 729 } 730 731 /** 732 * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on 733 * construction. Cell to add. Cannot be empty nor null. 734 */ 735 @Override 736 public void append(final Cell cell) throws IOException { 737 // checkKey uses comparator to check we are writing in order. 738 boolean dupKey = checkKey(cell); 739 if (!dupKey) { 740 checkBlockBoundary(); 741 } 742 743 if (!blockWriter.isWriting()) { 744 newBlock(); 745 } 746 747 blockWriter.write(cell); 748 749 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 750 totalValueLength += cell.getValueLength(); 751 if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) { 752 lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell); 753 keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell); 754 } 755 // Are we the first key in this block? 756 if (firstCellInBlock == null) { 757 // If cell is big, block will be closed and this firstCellInBlock reference will only last 758 // a short while. 759 firstCellInBlock = cell; 760 } 761 762 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 763 lastCell = cell; 764 entryCount++; 765 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 766 int tagsLength = cell.getTagsLength(); 767 if (tagsLength > this.maxTagsLength) { 768 this.maxTagsLength = tagsLength; 769 } 770 } 771 772 @Override 773 public void beforeShipped() throws IOException { 774 this.blockWriter.beforeShipped(); 775 // Add clone methods for every cell 776 if (this.lastCell != null) { 777 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 778 } 779 if (this.firstCellInBlock != null) { 780 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 781 } 782 if (this.lastCellOfPreviousBlock != null) { 783 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 784 } 785 } 786 787 public Cell getLastCell() { 788 return lastCell; 789 } 790 791 protected void finishFileInfo() throws IOException { 792 if (lastCell != null) { 793 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 794 // byte buffer. Won't take a tuple. 795 byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 796 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 797 } 798 799 // Average key length. 800 int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 801 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 802 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 803 false); 804 805 // Average value length. 806 int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 807 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 808 809 // Biggest cell. 810 if (keyOfBiggestCell != null) { 811 fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false); 812 fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false); 813 LOG.debug("Len of the biggest cell in {} is {}, key is {}", 814 this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell, 815 CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false)); 816 } 817 818 if (hFileContext.isIncludesTags()) { 819 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 820 // from the FileInfo 821 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 822 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 823 && hFileContext.isCompressTags(); 824 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 825 } 826 } 827 828 protected int getMajorVersion() { 829 return 3; 830 } 831 832 protected int getMinorVersion() { 833 return HFileReaderImpl.MAX_MINOR_VERSION; 834 } 835 836 protected void finishClose(FixedFileTrailer trailer) throws IOException { 837 // Write out encryption metadata before finalizing if we have a valid crypto context 838 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 839 if (cryptoContext != Encryption.Context.NONE) { 840 // Wrap the context's key and write it as the encryption metadata, the wrapper includes 841 // all information needed for decryption 842 trailer.setEncryptionKey(EncryptionUtil.wrapKey( 843 cryptoContext.getConf(), cryptoContext.getConf() 844 .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), 845 cryptoContext.getKey())); 846 } 847 // Now we can finish the close 848 trailer.setMetaIndexCount(metaNames.size()); 849 trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize()); 850 trailer.setEntryCount(entryCount); 851 trailer.setCompressionCodec(hFileContext.getCompression()); 852 853 long startTime = EnvironmentEdgeManager.currentTime(); 854 trailer.serialize(outputStream); 855 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 856 857 if (closeOutputStream) { 858 outputStream.close(); 859 outputStream = null; 860 } 861 } 862}