001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; 021 022import java.io.DataOutput; 023import java.io.DataOutputStream; 024import java.io.IOException; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FSDataOutputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.fs.permission.FsPermission; 034import org.apache.hadoop.hbase.ByteBufferExtendedCell; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellComparator; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.ExtendedCell; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.KeyValueUtil; 042import org.apache.hadoop.hbase.MetaCellComparator; 043import org.apache.hadoop.hbase.PrivateCellUtil; 044import org.apache.hadoop.hbase.io.compress.Compression; 045import org.apache.hadoop.hbase.io.crypto.Encryption; 046import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 047import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding; 048import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; 049import org.apache.hadoop.hbase.security.EncryptionUtil; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.BloomFilterWriter; 052import org.apache.hadoop.hbase.util.ByteBufferUtils; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.FSUtils; 057import org.apache.hadoop.io.Writable; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Common functionality needed by all versions of {@link HFile} writers. 064 */ 065@InterfaceAudience.Private 066public class HFileWriterImpl implements HFile.Writer { 067 private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class); 068 069 private static final long UNSET = -1; 070 071 /** if this feature is enabled, preCalculate encoded data size before real encoding happens */ 072 public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO = 073 "hbase.writer.unified.encoded.blocksize.ratio"; 074 075 /** Block size limit after encoding, used to unify encoded block Cache entry size */ 076 private final int encodedBlockSizeLimit; 077 078 /** The Cell previously appended. Becomes the last cell in the file. */ 079 protected ExtendedCell lastCell = null; 080 081 /** FileSystem stream to write into. */ 082 protected FSDataOutputStream outputStream; 083 084 /** True if we opened the <code>outputStream</code> (and so will close it). */ 085 protected final boolean closeOutputStream; 086 087 /** A "file info" block: a key-value map of file-wide metadata. */ 088 protected HFileInfo fileInfo = new HFileInfo(); 089 090 /** Total # of key/value entries, i.e. how many times add() was called. */ 091 protected long entryCount = 0; 092 093 /** Used for calculating the average key length. */ 094 protected long totalKeyLength = 0; 095 096 /** Used for calculating the average value length. */ 097 protected long totalValueLength = 0; 098 099 /** Len of the biggest cell. */ 100 protected long lenOfBiggestCell = 0; 101 /** Key of the biggest cell. */ 102 protected byte[] keyOfBiggestCell; 103 104 /** Total uncompressed bytes, maybe calculate a compression ratio later. */ 105 protected long totalUncompressedBytes = 0; 106 107 /** Meta block names. */ 108 protected List<byte[]> metaNames = new ArrayList<>(); 109 110 /** {@link Writable}s representing meta block data. */ 111 protected List<Writable> metaData = new ArrayList<>(); 112 113 /** 114 * First cell in a block. This reference should be short-lived since we write hfiles in a burst. 115 */ 116 protected ExtendedCell firstCellInBlock = null; 117 118 /** May be null if we were passed a stream. */ 119 protected final Path path; 120 121 /** Cache configuration for caching data on write. */ 122 protected final CacheConfig cacheConf; 123 124 /** 125 * Name for this object used when logging or in toString. Is either the result of a toString on 126 * stream or else name of passed file Path. 127 */ 128 protected final String name; 129 130 /** 131 * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is 132 * no encoding. 133 */ 134 protected final HFileDataBlockEncoder blockEncoder; 135 136 protected final HFileIndexBlockEncoder indexBlockEncoder; 137 138 protected final HFileContext hFileContext; 139 140 private int maxTagsLength = 0; 141 142 /** KeyValue version in FileInfo */ 143 public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION"); 144 145 /** Version for KeyValue which includes memstore timestamp */ 146 public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1; 147 148 /** Inline block writers for multi-level block index and compound Blooms. */ 149 private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>(); 150 151 /** block writer */ 152 protected HFileBlock.Writer blockWriter; 153 154 private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter; 155 private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter; 156 157 /** The offset of the first data block or -1 if the file is empty. */ 158 private long firstDataBlockOffset = UNSET; 159 160 /** The offset of the last data block or 0 if the file is empty. */ 161 protected long lastDataBlockOffset = UNSET; 162 163 /** 164 * The last(stop) Cell of the previous data block. This reference should be short-lived since we 165 * write hfiles in a burst. 166 */ 167 private ExtendedCell lastCellOfPreviousBlock = null; 168 169 /** Additional data items to be written to the "load-on-open" section. */ 170 private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>(); 171 172 protected long maxMemstoreTS = 0; 173 174 public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path, 175 FSDataOutputStream outputStream, HFileContext fileContext) { 176 this.outputStream = outputStream; 177 this.path = path; 178 this.name = path != null ? path.getName() : outputStream.toString(); 179 this.hFileContext = fileContext; 180 DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); 181 if (encoding != DataBlockEncoding.NONE) { 182 this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); 183 } else { 184 this.blockEncoder = NoOpDataBlockEncoder.INSTANCE; 185 } 186 IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding(); 187 if (indexBlockEncoding != IndexBlockEncoding.NONE) { 188 this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding); 189 } else { 190 this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE; 191 } 192 closeOutputStream = path != null; 193 this.cacheConf = cacheConf; 194 float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f); 195 this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio); 196 197 finishInit(conf); 198 if (LOG.isTraceEnabled()) { 199 LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: " 200 + cacheConf + " fileContext: " + fileContext); 201 } 202 } 203 204 /** 205 * Add to the file info. All added key/value pairs can be obtained using 206 * {@link HFile.Reader#getHFileInfo()}. 207 * @param k Key 208 * @param v Value 209 * @throws IOException in case the key or the value are invalid 210 */ 211 @Override 212 public void appendFileInfo(final byte[] k, final byte[] v) throws IOException { 213 fileInfo.append(k, v, true); 214 } 215 216 /** 217 * Sets the file info offset in the trailer, finishes up populating fields in the file info, and 218 * writes the file info into the given data output. The reason the data output is not always 219 * {@link #outputStream} is that we store file info as a block in version 2. 220 * @param trailer fixed file trailer 221 * @param out the data output to write the file info to 222 */ 223 protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out) 224 throws IOException { 225 trailer.setFileInfoOffset(outputStream.getPos()); 226 finishFileInfo(); 227 long startTime = EnvironmentEdgeManager.currentTime(); 228 fileInfo.write(out); 229 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 230 } 231 232 public long getPos() throws IOException { 233 return outputStream.getPos(); 234 235 } 236 237 /** 238 * Checks that the given Cell's key does not violate the key order. 239 * @param cell Cell whose key to check. 240 * @return true if the key is duplicate 241 * @throws IOException if the key or the key order is wrong 242 */ 243 protected boolean checkKey(final Cell cell) throws IOException { 244 boolean isDuplicateKey = false; 245 246 if (cell == null) { 247 throw new IOException("Key cannot be null or empty"); 248 } 249 if (lastCell != null) { 250 int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(), 251 lastCell, cell); 252 if (keyComp > 0) { 253 String message = getLexicalErrorMessage(cell); 254 throw new IOException(message); 255 } else if (keyComp == 0) { 256 isDuplicateKey = true; 257 } 258 } 259 return isDuplicateKey; 260 } 261 262 private String getLexicalErrorMessage(Cell cell) { 263 StringBuilder sb = new StringBuilder(); 264 sb.append("Added a key not lexically larger than previous. Current cell = "); 265 sb.append(cell); 266 sb.append(", lastCell = "); 267 sb.append(lastCell); 268 // file context includes HFile path and optionally table and CF of file being written 269 sb.append("fileContext="); 270 sb.append(hFileContext); 271 return sb.toString(); 272 } 273 274 /** Checks the given value for validity. */ 275 protected void checkValue(final byte[] value, final int offset, final int length) 276 throws IOException { 277 if (value == null) { 278 throw new IOException("Value cannot be null"); 279 } 280 } 281 282 /** Returns Path or null if we were passed a stream rather than a Path. */ 283 @Override 284 public Path getPath() { 285 return path; 286 } 287 288 @Override 289 public String toString() { 290 return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression=" 291 + hFileContext.getCompression().getName(); 292 } 293 294 public static Compression.Algorithm compressionByName(String algoName) { 295 if (algoName == null) { 296 return HFile.DEFAULT_COMPRESSION_ALGORITHM; 297 } 298 return Compression.getCompressionAlgorithmByName(algoName); 299 } 300 301 /** A helper method to create HFile output streams in constructors */ 302 protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs, 303 Path path, InetSocketAddress[] favoredNodes) throws IOException { 304 FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); 305 return FSUtils.create(conf, fs, path, perms, favoredNodes); 306 } 307 308 /** Additional initialization steps */ 309 protected void finishInit(final Configuration conf) { 310 if (blockWriter != null) { 311 throw new IllegalStateException("finishInit called twice"); 312 } 313 blockWriter = 314 new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(), 315 conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10)); 316 // Data block index writer 317 boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); 318 dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter, 319 cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder); 320 dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf)); 321 dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf)); 322 inlineBlockWriters.add(dataBlockIndexWriter); 323 324 // Meta data block index writer 325 metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); 326 LOG.trace("Initialized with {}", cacheConf); 327 } 328 329 /** 330 * At a block boundary, write all the inline blocks and opens new block. 331 */ 332 protected void checkBlockBoundary() throws IOException { 333 boolean shouldFinishBlock = false; 334 // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0 335 // and we should use the encoding ratio 336 if (encodedBlockSizeLimit > 0) { 337 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit; 338 } else { 339 shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize() 340 || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize(); 341 } 342 shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate(); 343 if (shouldFinishBlock) { 344 finishBlock(); 345 writeInlineBlocks(false); 346 newBlock(); 347 } 348 } 349 350 /** Clean up the data block that is currently being written. */ 351 private void finishBlock() throws IOException { 352 if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) { 353 return; 354 } 355 356 // Update the first data block offset if UNSET; used scanning. 357 if (firstDataBlockOffset == UNSET) { 358 firstDataBlockOffset = outputStream.getPos(); 359 } 360 // Update the last data block offset each time through here. 361 lastDataBlockOffset = outputStream.getPos(); 362 blockWriter.writeHeaderAndData(outputStream); 363 int onDiskSize = blockWriter.getOnDiskSizeWithHeader(); 364 ExtendedCell indexEntry = 365 getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock); 366 dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), 367 lastDataBlockOffset, onDiskSize); 368 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 369 if (cacheConf.shouldCacheDataOnWrite()) { 370 doCacheOnWrite(lastDataBlockOffset); 371 } 372 } 373 374 /** 375 * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is 376 * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an 377 * optimization. It does not always work. In this case we'll just return the <code>right</code> 378 * cell. 379 * @return A cell that sorts between <code>left</code> and <code>right</code>. 380 */ 381 public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left, 382 final ExtendedCell right) { 383 if (right == null) { 384 throw new IllegalArgumentException("right cell can not be null"); 385 } 386 if (left == null) { 387 return right; 388 } 389 // If Cells from meta table, don't mess around. meta table Cells have schema 390 // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip 391 // out without trying to do this optimization. 392 if (comparator instanceof MetaCellComparator) { 393 return right; 394 } 395 byte[] midRow; 396 boolean bufferBacked = 397 left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell; 398 if (bufferBacked) { 399 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(), 400 ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(), 401 ((ByteBufferExtendedCell) right).getRowByteBuffer(), 402 ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength()); 403 } else { 404 midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(), 405 right.getRowArray(), right.getRowOffset(), right.getRowLength()); 406 } 407 if (midRow != null) { 408 return PrivateCellUtil.createFirstOnRow(midRow); 409 } 410 // Rows are same. Compare on families. 411 if (bufferBacked) { 412 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(), 413 ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(), 414 ((ByteBufferExtendedCell) right).getFamilyByteBuffer(), 415 ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength()); 416 } else { 417 midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(), 418 left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(), 419 right.getFamilyLength()); 420 } 421 if (midRow != null) { 422 return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length); 423 } 424 // Families are same. Compare on qualifiers. 425 if (bufferBacked) { 426 midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(), 427 ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(), 428 ((ByteBufferExtendedCell) right).getQualifierByteBuffer(), 429 ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength()); 430 } else { 431 midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(), 432 left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(), 433 right.getQualifierLength()); 434 } 435 if (midRow != null) { 436 return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length); 437 } 438 // No opportunity for optimization. Just return right key. 439 return right; 440 } 441 442 /** 443 * Try to get a byte array that falls between left and right as short as possible with 444 * lexicographical order; 445 * @return Return a new array that is between left and right and minimally sized else just return 446 * null if left == right. 447 */ 448 private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset, 449 final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) { 450 int minLength = leftLength < rightLength ? leftLength : rightLength; 451 int diffIdx = 0; 452 for (; diffIdx < minLength; diffIdx++) { 453 byte leftByte = leftArray[leftOffset + diffIdx]; 454 byte rightByte = rightArray[rightOffset + diffIdx]; 455 if ((leftByte & 0xff) > (rightByte & 0xff)) { 456 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 457 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 458 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 459 } else if (leftByte != rightByte) { 460 break; 461 } 462 } 463 if (diffIdx == minLength) { 464 if (leftLength > rightLength) { 465 // right is prefix of left 466 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 467 + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right=" 468 + Bytes.toStringBinary(rightArray, rightOffset, rightLength)); 469 } else if (leftLength < rightLength) { 470 // left is prefix of right. 471 byte[] minimumMidpointArray = new byte[minLength + 1]; 472 System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1); 473 minimumMidpointArray[minLength] = 0x00; 474 return minimumMidpointArray; 475 } else { 476 // left == right 477 return null; 478 } 479 } 480 // Note that left[diffIdx] can never be equal to 0xff since left < right 481 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 482 System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1); 483 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 484 return minimumMidpointArray; 485 } 486 487 /** 488 * Try to create a new byte array that falls between left and right as short as possible with 489 * lexicographical order. 490 * @return Return a new array that is between left and right and minimally sized else just return 491 * null if left == right. 492 */ 493 private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength, 494 ByteBuffer right, int rightOffset, int rightLength) { 495 int minLength = leftLength < rightLength ? leftLength : rightLength; 496 int diffIdx = 0; 497 for (; diffIdx < minLength; diffIdx++) { 498 int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx); 499 int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx); 500 if ((leftByte & 0xff) > (rightByte & 0xff)) { 501 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 502 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 503 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 504 } else if (leftByte != rightByte) { 505 break; 506 } 507 } 508 if (diffIdx == minLength) { 509 if (leftLength > rightLength) { 510 // right is prefix of left 511 throw new IllegalArgumentException("Left byte array sorts after right row; left=" 512 + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right=" 513 + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength)); 514 } else if (leftLength < rightLength) { 515 // left is prefix of right. 516 byte[] minimumMidpointArray = new byte[minLength + 1]; 517 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0, 518 minLength + 1); 519 minimumMidpointArray[minLength] = 0x00; 520 return minimumMidpointArray; 521 } else { 522 // left == right 523 return null; 524 } 525 } 526 // Note that left[diffIdx] can never be equal to 0xff since left < right 527 byte[] minimumMidpointArray = new byte[diffIdx + 1]; 528 ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1); 529 minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1); 530 return minimumMidpointArray; 531 } 532 533 /** Gives inline block writers an opportunity to contribute blocks. */ 534 private void writeInlineBlocks(boolean closing) throws IOException { 535 for (InlineBlockWriter ibw : inlineBlockWriters) { 536 while (ibw.shouldWriteBlock(closing)) { 537 long offset = outputStream.getPos(); 538 boolean cacheThisBlock = ibw.getCacheOnWrite(); 539 ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType())); 540 blockWriter.writeHeaderAndData(outputStream); 541 ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(), 542 blockWriter.getUncompressedSizeWithoutHeader()); 543 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 544 545 if (cacheThisBlock) { 546 doCacheOnWrite(offset); 547 } 548 } 549 } 550 } 551 552 /** 553 * Caches the last written HFile block. 554 * @param offset the offset of the block we want to cache. Used to determine the cache key. 555 */ 556 private void doCacheOnWrite(long offset) { 557 cacheConf.getBlockCache().ifPresent(cache -> { 558 HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf); 559 BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType()); 560 try { 561 cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true); 562 } finally { 563 // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent 564 cacheFormatBlock.release(); 565 } 566 }); 567 } 568 569 private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { 570 if (path != null) { 571 return new BlockCacheKey(path, offset, true, blockType); 572 } 573 return new BlockCacheKey(name, offset, true, blockType); 574 } 575 576 /** 577 * Ready a new block for writing. 578 */ 579 protected void newBlock() throws IOException { 580 // This is where the next block begins. 581 blockWriter.startWriting(BlockType.DATA); 582 firstCellInBlock = null; 583 if (lastCell != null) { 584 lastCellOfPreviousBlock = lastCell; 585 } 586 } 587 588 /** 589 * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive. 590 * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance. 591 * If metadata is small, consider adding to file info using 592 * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data 593 * later (DO NOT REUSE) 594 */ 595 @Override 596 public void appendMetaBlock(String metaBlockName, Writable content) { 597 byte[] key = Bytes.toBytes(metaBlockName); 598 int i; 599 for (i = 0; i < metaNames.size(); ++i) { 600 // stop when the current key is greater than our own 601 byte[] cur = metaNames.get(i); 602 if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) { 603 break; 604 } 605 } 606 metaNames.add(i, key); 607 metaData.add(i, content); 608 } 609 610 @Override 611 public void close() throws IOException { 612 if (outputStream == null) { 613 return; 614 } 615 // Save data block encoder metadata in the file info. 616 blockEncoder.saveMetadata(this); 617 // Save index block encoder metadata in the file info. 618 indexBlockEncoder.saveMetadata(this); 619 // Write out the end of the data blocks, then write meta data blocks. 620 // followed by fileinfo, data block index and meta block index. 621 622 finishBlock(); 623 writeInlineBlocks(true); 624 625 FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion()); 626 627 // Write out the metadata blocks if any. 628 if (!metaNames.isEmpty()) { 629 for (int i = 0; i < metaNames.size(); ++i) { 630 // store the beginning offset 631 long offset = outputStream.getPos(); 632 // write the metadata content 633 DataOutputStream dos = blockWriter.startWriting(BlockType.META); 634 metaData.get(i).write(dos); 635 636 blockWriter.writeHeaderAndData(outputStream); 637 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 638 639 // Add the new meta block to the meta index. 640 metaBlockIndexWriter.addEntry(metaNames.get(i), offset, 641 blockWriter.getOnDiskSizeWithHeader()); 642 } 643 } 644 645 // Load-on-open section. 646 647 // Data block index. 648 // 649 // In version 2, this section of the file starts with the root level data 650 // block index. We call a function that writes intermediate-level blocks 651 // first, then root level, and returns the offset of the root level block 652 // index. 653 654 long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream); 655 trailer.setLoadOnOpenOffset(rootIndexOffset); 656 657 // Meta block index. 658 metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX), 659 "meta"); 660 blockWriter.writeHeaderAndData(outputStream); 661 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 662 663 if (this.hFileContext.isIncludesMvcc()) { 664 appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS)); 665 appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE)); 666 } 667 668 // File info 669 writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO)); 670 blockWriter.writeHeaderAndData(outputStream); 671 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 672 673 // Load-on-open data supplied by higher levels, e.g. Bloom filters. 674 for (BlockWritable w : additionalLoadOnOpenData) { 675 blockWriter.writeBlock(w, outputStream); 676 totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader(); 677 } 678 679 // Now finish off the trailer. 680 trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels()); 681 trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize()); 682 trailer.setFirstDataBlockOffset(firstDataBlockOffset); 683 trailer.setLastDataBlockOffset(lastDataBlockOffset); 684 trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass()); 685 trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries()); 686 687 finishClose(trailer); 688 689 blockWriter.release(); 690 } 691 692 @Override 693 public void addInlineBlockWriter(InlineBlockWriter ibw) { 694 inlineBlockWriters.add(ibw); 695 } 696 697 @Override 698 public void addGeneralBloomFilter(final BloomFilterWriter bfw) { 699 this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META); 700 } 701 702 @Override 703 public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) { 704 this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META); 705 } 706 707 private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) { 708 if (bfw.getKeyCount() <= 0) { 709 return; 710 } 711 712 if ( 713 blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META 714 ) { 715 throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported"); 716 } 717 additionalLoadOnOpenData.add(new BlockWritable() { 718 @Override 719 public BlockType getBlockType() { 720 return blockType; 721 } 722 723 @Override 724 public void writeToBlock(DataOutput out) throws IOException { 725 bfw.getMetaWriter().write(out); 726 Writable dataWriter = bfw.getDataWriter(); 727 if (dataWriter != null) { 728 dataWriter.write(out); 729 } 730 } 731 }); 732 } 733 734 @Override 735 public HFileContext getFileContext() { 736 return hFileContext; 737 } 738 739 /** 740 * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on 741 * construction. Cell to add. Cannot be empty nor null. 742 */ 743 @Override 744 public void append(final ExtendedCell cell) throws IOException { 745 // checkKey uses comparator to check we are writing in order. 746 boolean dupKey = checkKey(cell); 747 if (!dupKey) { 748 checkBlockBoundary(); 749 } 750 751 if (!blockWriter.isWriting()) { 752 newBlock(); 753 } 754 755 blockWriter.write(cell); 756 757 totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell); 758 totalValueLength += cell.getValueLength(); 759 if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) { 760 lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell); 761 keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell); 762 } 763 // Are we the first key in this block? 764 if (firstCellInBlock == null) { 765 // If cell is big, block will be closed and this firstCellInBlock reference will only last 766 // a short while. 767 firstCellInBlock = cell; 768 } 769 770 // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely? 771 lastCell = cell; 772 entryCount++; 773 this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId()); 774 int tagsLength = cell.getTagsLength(); 775 if (tagsLength > this.maxTagsLength) { 776 this.maxTagsLength = tagsLength; 777 } 778 } 779 780 @Override 781 public void beforeShipped() throws IOException { 782 this.blockWriter.beforeShipped(); 783 // Add clone methods for every cell 784 if (this.lastCell != null) { 785 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 786 } 787 if (this.firstCellInBlock != null) { 788 this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock); 789 } 790 if (this.lastCellOfPreviousBlock != null) { 791 this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock); 792 } 793 } 794 795 public Cell getLastCell() { 796 return lastCell; 797 } 798 799 protected void finishFileInfo() throws IOException { 800 if (lastCell != null) { 801 // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean 802 // byte buffer. Won't take a tuple. 803 byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell); 804 fileInfo.append(HFileInfo.LASTKEY, lastKey, false); 805 } 806 807 // Average key length. 808 int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount); 809 fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false); 810 fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()), 811 false); 812 813 // Average value length. 814 int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount); 815 fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false); 816 817 // Biggest cell. 818 if (keyOfBiggestCell != null) { 819 fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false); 820 fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false); 821 LOG.debug("Len of the biggest cell in {} is {}, key is {}", 822 this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell, 823 CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false)); 824 } 825 826 if (hFileContext.isIncludesTags()) { 827 // When tags are not being written in this file, MAX_TAGS_LEN is excluded 828 // from the FileInfo 829 fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false); 830 boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE) 831 && hFileContext.isCompressTags(); 832 fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false); 833 } 834 } 835 836 protected int getMajorVersion() { 837 return 3; 838 } 839 840 protected int getMinorVersion() { 841 return HFileReaderImpl.MAX_MINOR_VERSION; 842 } 843 844 protected void finishClose(FixedFileTrailer trailer) throws IOException { 845 // Write out encryption metadata before finalizing if we have a valid crypto context 846 Encryption.Context cryptoContext = hFileContext.getEncryptionContext(); 847 if (cryptoContext != Encryption.Context.NONE) { 848 // Wrap the context's key and write it as the encryption metadata, the wrapper includes 849 // all information needed for decryption 850 trailer.setEncryptionKey(EncryptionUtil.wrapKey( 851 cryptoContext.getConf(), cryptoContext.getConf() 852 .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), 853 cryptoContext.getKey())); 854 } 855 // Now we can finish the close 856 trailer.setMetaIndexCount(metaNames.size()); 857 trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize()); 858 trailer.setEntryCount(entryCount); 859 trailer.setCompressionCodec(hFileContext.getCompression()); 860 861 long startTime = EnvironmentEdgeManager.currentTime(); 862 trailer.serialize(outputStream); 863 HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime); 864 865 if (closeOutputStream) { 866 outputStream.close(); 867 outputStream = null; 868 } 869 } 870}