001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
021
022import java.io.DataOutput;
023import java.io.DataOutputStream;
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FSDataOutputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.fs.permission.FsPermission;
034import org.apache.hadoop.hbase.ByteBufferExtendedCell;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparator;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.ExtendedCell;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValueUtil;
042import org.apache.hadoop.hbase.MetaCellComparator;
043import org.apache.hadoop.hbase.PrivateCellUtil;
044import org.apache.hadoop.hbase.io.compress.Compression;
045import org.apache.hadoop.hbase.io.crypto.Encryption;
046import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
047import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
048import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
049import org.apache.hadoop.hbase.security.EncryptionUtil;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.BloomFilterWriter;
052import org.apache.hadoop.hbase.util.ByteBufferUtils;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.FSUtils;
057import org.apache.hadoop.io.Writable;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Common functionality needed by all versions of {@link HFile} writers.
064 */
065@InterfaceAudience.Private
066public class HFileWriterImpl implements HFile.Writer {
067  private static final Logger LOG = LoggerFactory.getLogger(HFileWriterImpl.class);
068
069  private static final long UNSET = -1;
070
071  /** if this feature is enabled, preCalculate encoded data size before real encoding happens */
072  public static final String UNIFIED_ENCODED_BLOCKSIZE_RATIO =
073    "hbase.writer.unified.encoded.blocksize.ratio";
074
075  /** Block size limit after encoding, used to unify encoded block Cache entry size */
076  private final int encodedBlockSizeLimit;
077
078  /** The Cell previously appended. Becomes the last cell in the file. */
079  protected ExtendedCell lastCell = null;
080
081  /** FileSystem stream to write into. */
082  protected FSDataOutputStream outputStream;
083
084  /** True if we opened the <code>outputStream</code> (and so will close it). */
085  protected final boolean closeOutputStream;
086
087  /** A "file info" block: a key-value map of file-wide metadata. */
088  protected HFileInfo fileInfo = new HFileInfo();
089
090  /** Total # of key/value entries, i.e. how many times add() was called. */
091  protected long entryCount = 0;
092
093  /** Used for calculating the average key length. */
094  protected long totalKeyLength = 0;
095
096  /** Used for calculating the average value length. */
097  protected long totalValueLength = 0;
098
099  /** Len of the biggest cell. */
100  protected long lenOfBiggestCell = 0;
101  /** Key of the biggest cell. */
102  protected byte[] keyOfBiggestCell;
103
104  /** Total uncompressed bytes, maybe calculate a compression ratio later. */
105  protected long totalUncompressedBytes = 0;
106
107  /** Meta block names. */
108  protected List<byte[]> metaNames = new ArrayList<>();
109
110  /** {@link Writable}s representing meta block data. */
111  protected List<Writable> metaData = new ArrayList<>();
112
113  /**
114   * First cell in a block. This reference should be short-lived since we write hfiles in a burst.
115   */
116  protected ExtendedCell firstCellInBlock = null;
117
118  /** May be null if we were passed a stream. */
119  protected final Path path;
120
121  /** Cache configuration for caching data on write. */
122  protected final CacheConfig cacheConf;
123
124  /**
125   * Name for this object used when logging or in toString. Is either the result of a toString on
126   * stream or else name of passed file Path.
127   */
128  protected final String name;
129
130  /**
131   * The data block encoding which will be used. {@link NoOpDataBlockEncoder#INSTANCE} if there is
132   * no encoding.
133   */
134  protected final HFileDataBlockEncoder blockEncoder;
135
136  protected final HFileIndexBlockEncoder indexBlockEncoder;
137
138  protected final HFileContext hFileContext;
139
140  private int maxTagsLength = 0;
141
142  /** KeyValue version in FileInfo */
143  public static final byte[] KEY_VALUE_VERSION = Bytes.toBytes("KEY_VALUE_VERSION");
144
145  /** Version for KeyValue which includes memstore timestamp */
146  public static final int KEY_VALUE_VER_WITH_MEMSTORE = 1;
147
148  /** Inline block writers for multi-level block index and compound Blooms. */
149  private List<InlineBlockWriter> inlineBlockWriters = new ArrayList<>();
150
151  /** block writer */
152  protected HFileBlock.Writer blockWriter;
153
154  private HFileBlockIndex.BlockIndexWriter dataBlockIndexWriter;
155  private HFileBlockIndex.BlockIndexWriter metaBlockIndexWriter;
156
157  /** The offset of the first data block or -1 if the file is empty. */
158  private long firstDataBlockOffset = UNSET;
159
160  /** The offset of the last data block or 0 if the file is empty. */
161  protected long lastDataBlockOffset = UNSET;
162
163  /**
164   * The last(stop) Cell of the previous data block. This reference should be short-lived since we
165   * write hfiles in a burst.
166   */
167  private ExtendedCell lastCellOfPreviousBlock = null;
168
169  /** Additional data items to be written to the "load-on-open" section. */
170  private List<BlockWritable> additionalLoadOnOpenData = new ArrayList<>();
171
172  protected long maxMemstoreTS = 0;
173
174  public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
175    FSDataOutputStream outputStream, HFileContext fileContext) {
176    this.outputStream = outputStream;
177    this.path = path;
178    this.name = path != null ? path.getName() : outputStream.toString();
179    this.hFileContext = fileContext;
180    DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
181    if (encoding != DataBlockEncoding.NONE) {
182      this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
183    } else {
184      this.blockEncoder = NoOpDataBlockEncoder.INSTANCE;
185    }
186    IndexBlockEncoding indexBlockEncoding = hFileContext.getIndexBlockEncoding();
187    if (indexBlockEncoding != IndexBlockEncoding.NONE) {
188      this.indexBlockEncoder = new HFileIndexBlockEncoderImpl(indexBlockEncoding);
189    } else {
190      this.indexBlockEncoder = NoOpIndexBlockEncoder.INSTANCE;
191    }
192    closeOutputStream = path != null;
193    this.cacheConf = cacheConf;
194    float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
195    this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);
196
197    finishInit(conf);
198    if (LOG.isTraceEnabled()) {
199      LOG.trace("Writer" + (path != null ? " for " + path : "") + " initialized with cacheConf: "
200        + cacheConf + " fileContext: " + fileContext);
201    }
202  }
203
204  /**
205   * Add to the file info. All added key/value pairs can be obtained using
206   * {@link HFile.Reader#getHFileInfo()}.
207   * @param k Key
208   * @param v Value
209   * @throws IOException in case the key or the value are invalid
210   */
211  @Override
212  public void appendFileInfo(final byte[] k, final byte[] v) throws IOException {
213    fileInfo.append(k, v, true);
214  }
215
216  /**
217   * Sets the file info offset in the trailer, finishes up populating fields in the file info, and
218   * writes the file info into the given data output. The reason the data output is not always
219   * {@link #outputStream} is that we store file info as a block in version 2.
220   * @param trailer fixed file trailer
221   * @param out     the data output to write the file info to
222   */
223  protected final void writeFileInfo(FixedFileTrailer trailer, DataOutputStream out)
224    throws IOException {
225    trailer.setFileInfoOffset(outputStream.getPos());
226    finishFileInfo();
227    long startTime = EnvironmentEdgeManager.currentTime();
228    fileInfo.write(out);
229    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
230  }
231
232  public long getPos() throws IOException {
233    return outputStream.getPos();
234
235  }
236
237  /**
238   * Checks that the given Cell's key does not violate the key order.
239   * @param cell Cell whose key to check.
240   * @return true if the key is duplicate
241   * @throws IOException if the key or the key order is wrong
242   */
243  protected boolean checkKey(final Cell cell) throws IOException {
244    boolean isDuplicateKey = false;
245
246    if (cell == null) {
247      throw new IOException("Key cannot be null or empty");
248    }
249    if (lastCell != null) {
250      int keyComp = PrivateCellUtil.compareKeyIgnoresMvcc(this.hFileContext.getCellComparator(),
251        lastCell, cell);
252      if (keyComp > 0) {
253        String message = getLexicalErrorMessage(cell);
254        throw new IOException(message);
255      } else if (keyComp == 0) {
256        isDuplicateKey = true;
257      }
258    }
259    return isDuplicateKey;
260  }
261
262  private String getLexicalErrorMessage(Cell cell) {
263    StringBuilder sb = new StringBuilder();
264    sb.append("Added a key not lexically larger than previous. Current cell = ");
265    sb.append(cell);
266    sb.append(", lastCell = ");
267    sb.append(lastCell);
268    // file context includes HFile path and optionally table and CF of file being written
269    sb.append("fileContext=");
270    sb.append(hFileContext);
271    return sb.toString();
272  }
273
274  /** Checks the given value for validity. */
275  protected void checkValue(final byte[] value, final int offset, final int length)
276    throws IOException {
277    if (value == null) {
278      throw new IOException("Value cannot be null");
279    }
280  }
281
282  /** Returns Path or null if we were passed a stream rather than a Path. */
283  @Override
284  public Path getPath() {
285    return path;
286  }
287
288  @Override
289  public String toString() {
290    return "writer=" + (path != null ? path.toString() : null) + ", name=" + name + ", compression="
291      + hFileContext.getCompression().getName();
292  }
293
294  public static Compression.Algorithm compressionByName(String algoName) {
295    if (algoName == null) {
296      return HFile.DEFAULT_COMPRESSION_ALGORITHM;
297    }
298    return Compression.getCompressionAlgorithmByName(algoName);
299  }
300
301  /** A helper method to create HFile output streams in constructors */
302  protected static FSDataOutputStream createOutputStream(Configuration conf, FileSystem fs,
303    Path path, InetSocketAddress[] favoredNodes) throws IOException {
304    FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
305    return FSUtils.create(conf, fs, path, perms, favoredNodes);
306  }
307
308  /** Additional initialization steps */
309  protected void finishInit(final Configuration conf) {
310    if (blockWriter != null) {
311      throw new IllegalStateException("finishInit called twice");
312    }
313    blockWriter =
314      new HFileBlock.Writer(conf, blockEncoder, hFileContext, cacheConf.getByteBuffAllocator(),
315        conf.getInt(MAX_BLOCK_SIZE_UNCOMPRESSED, hFileContext.getBlocksize() * 10));
316    // Data block index writer
317    boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
318    dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
319      cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null, indexBlockEncoder);
320    dataBlockIndexWriter.setMaxChunkSize(HFileBlockIndex.getMaxChunkSize(conf));
321    dataBlockIndexWriter.setMinIndexNumEntries(HFileBlockIndex.getMinIndexNumEntries(conf));
322    inlineBlockWriters.add(dataBlockIndexWriter);
323
324    // Meta data block index writer
325    metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
326    LOG.trace("Initialized with {}", cacheConf);
327  }
328
329  /**
330   * At a block boundary, write all the inline blocks and opens new block.
331   */
332  protected void checkBlockBoundary() throws IOException {
333    boolean shouldFinishBlock = false;
334    // This means hbase.writer.unified.encoded.blocksize.ratio was set to something different from 0
335    // and we should use the encoding ratio
336    if (encodedBlockSizeLimit > 0) {
337      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= encodedBlockSizeLimit;
338    } else {
339      shouldFinishBlock = blockWriter.encodedBlockSizeWritten() >= hFileContext.getBlocksize()
340        || blockWriter.blockSizeWritten() >= hFileContext.getBlocksize();
341    }
342    shouldFinishBlock &= blockWriter.checkBoundariesWithPredicate();
343    if (shouldFinishBlock) {
344      finishBlock();
345      writeInlineBlocks(false);
346      newBlock();
347    }
348  }
349
350  /** Clean up the data block that is currently being written. */
351  private void finishBlock() throws IOException {
352    if (!blockWriter.isWriting() || blockWriter.blockSizeWritten() == 0) {
353      return;
354    }
355
356    // Update the first data block offset if UNSET; used scanning.
357    if (firstDataBlockOffset == UNSET) {
358      firstDataBlockOffset = outputStream.getPos();
359    }
360    // Update the last data block offset each time through here.
361    lastDataBlockOffset = outputStream.getPos();
362    blockWriter.writeHeaderAndData(outputStream);
363    int onDiskSize = blockWriter.getOnDiskSizeWithHeader();
364    ExtendedCell indexEntry =
365      getMidpoint(this.hFileContext.getCellComparator(), lastCellOfPreviousBlock, firstCellInBlock);
366    dataBlockIndexWriter.addEntry(PrivateCellUtil.getCellKeySerializedAsKeyValueKey(indexEntry),
367      lastDataBlockOffset, onDiskSize);
368    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
369    if (cacheConf.shouldCacheDataOnWrite()) {
370      doCacheOnWrite(lastDataBlockOffset);
371    }
372  }
373
374  /**
375   * Try to return a Cell that falls between <code>left</code> and <code>right</code> but that is
376   * shorter; i.e. takes up less space. This trick is used building HFile block index. Its an
377   * optimization. It does not always work. In this case we'll just return the <code>right</code>
378   * cell.
379   * @return A cell that sorts between <code>left</code> and <code>right</code>.
380   */
381  public static ExtendedCell getMidpoint(final CellComparator comparator, final ExtendedCell left,
382    final ExtendedCell right) {
383    if (right == null) {
384      throw new IllegalArgumentException("right cell can not be null");
385    }
386    if (left == null) {
387      return right;
388    }
389    // If Cells from meta table, don't mess around. meta table Cells have schema
390    // (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
391    // out without trying to do this optimization.
392    if (comparator instanceof MetaCellComparator) {
393      return right;
394    }
395    byte[] midRow;
396    boolean bufferBacked =
397      left instanceof ByteBufferExtendedCell && right instanceof ByteBufferExtendedCell;
398    if (bufferBacked) {
399      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getRowByteBuffer(),
400        ((ByteBufferExtendedCell) left).getRowPosition(), left.getRowLength(),
401        ((ByteBufferExtendedCell) right).getRowByteBuffer(),
402        ((ByteBufferExtendedCell) right).getRowPosition(), right.getRowLength());
403    } else {
404      midRow = getMinimumMidpointArray(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
405        right.getRowArray(), right.getRowOffset(), right.getRowLength());
406    }
407    if (midRow != null) {
408      return PrivateCellUtil.createFirstOnRow(midRow);
409    }
410    // Rows are same. Compare on families.
411    if (bufferBacked) {
412      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getFamilyByteBuffer(),
413        ((ByteBufferExtendedCell) left).getFamilyPosition(), left.getFamilyLength(),
414        ((ByteBufferExtendedCell) right).getFamilyByteBuffer(),
415        ((ByteBufferExtendedCell) right).getFamilyPosition(), right.getFamilyLength());
416    } else {
417      midRow = getMinimumMidpointArray(left.getFamilyArray(), left.getFamilyOffset(),
418        left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
419        right.getFamilyLength());
420    }
421    if (midRow != null) {
422      return PrivateCellUtil.createFirstOnRowFamily(right, midRow, 0, midRow.length);
423    }
424    // Families are same. Compare on qualifiers.
425    if (bufferBacked) {
426      midRow = getMinimumMidpointArray(((ByteBufferExtendedCell) left).getQualifierByteBuffer(),
427        ((ByteBufferExtendedCell) left).getQualifierPosition(), left.getQualifierLength(),
428        ((ByteBufferExtendedCell) right).getQualifierByteBuffer(),
429        ((ByteBufferExtendedCell) right).getQualifierPosition(), right.getQualifierLength());
430    } else {
431      midRow = getMinimumMidpointArray(left.getQualifierArray(), left.getQualifierOffset(),
432        left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
433        right.getQualifierLength());
434    }
435    if (midRow != null) {
436      return PrivateCellUtil.createFirstOnRowCol(right, midRow, 0, midRow.length);
437    }
438    // No opportunity for optimization. Just return right key.
439    return right;
440  }
441
442  /**
443   * Try to get a byte array that falls between left and right as short as possible with
444   * lexicographical order;
445   * @return Return a new array that is between left and right and minimally sized else just return
446   *         null if left == right.
447   */
448  private static byte[] getMinimumMidpointArray(final byte[] leftArray, final int leftOffset,
449    final int leftLength, final byte[] rightArray, final int rightOffset, final int rightLength) {
450    int minLength = leftLength < rightLength ? leftLength : rightLength;
451    int diffIdx = 0;
452    for (; diffIdx < minLength; diffIdx++) {
453      byte leftByte = leftArray[leftOffset + diffIdx];
454      byte rightByte = rightArray[rightOffset + diffIdx];
455      if ((leftByte & 0xff) > (rightByte & 0xff)) {
456        throw new IllegalArgumentException("Left byte array sorts after right row; left="
457          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
458          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
459      } else if (leftByte != rightByte) {
460        break;
461      }
462    }
463    if (diffIdx == minLength) {
464      if (leftLength > rightLength) {
465        // right is prefix of left
466        throw new IllegalArgumentException("Left byte array sorts after right row; left="
467          + Bytes.toStringBinary(leftArray, leftOffset, leftLength) + ", right="
468          + Bytes.toStringBinary(rightArray, rightOffset, rightLength));
469      } else if (leftLength < rightLength) {
470        // left is prefix of right.
471        byte[] minimumMidpointArray = new byte[minLength + 1];
472        System.arraycopy(rightArray, rightOffset, minimumMidpointArray, 0, minLength + 1);
473        minimumMidpointArray[minLength] = 0x00;
474        return minimumMidpointArray;
475      } else {
476        // left == right
477        return null;
478      }
479    }
480    // Note that left[diffIdx] can never be equal to 0xff since left < right
481    byte[] minimumMidpointArray = new byte[diffIdx + 1];
482    System.arraycopy(leftArray, leftOffset, minimumMidpointArray, 0, diffIdx + 1);
483    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
484    return minimumMidpointArray;
485  }
486
487  /**
488   * Try to create a new byte array that falls between left and right as short as possible with
489   * lexicographical order.
490   * @return Return a new array that is between left and right and minimally sized else just return
491   *         null if left == right.
492   */
493  private static byte[] getMinimumMidpointArray(ByteBuffer left, int leftOffset, int leftLength,
494    ByteBuffer right, int rightOffset, int rightLength) {
495    int minLength = leftLength < rightLength ? leftLength : rightLength;
496    int diffIdx = 0;
497    for (; diffIdx < minLength; diffIdx++) {
498      int leftByte = ByteBufferUtils.toByte(left, leftOffset + diffIdx);
499      int rightByte = ByteBufferUtils.toByte(right, rightOffset + diffIdx);
500      if ((leftByte & 0xff) > (rightByte & 0xff)) {
501        throw new IllegalArgumentException("Left byte array sorts after right row; left="
502          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
503          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
504      } else if (leftByte != rightByte) {
505        break;
506      }
507    }
508    if (diffIdx == minLength) {
509      if (leftLength > rightLength) {
510        // right is prefix of left
511        throw new IllegalArgumentException("Left byte array sorts after right row; left="
512          + ByteBufferUtils.toStringBinary(left, leftOffset, leftLength) + ", right="
513          + ByteBufferUtils.toStringBinary(right, rightOffset, rightLength));
514      } else if (leftLength < rightLength) {
515        // left is prefix of right.
516        byte[] minimumMidpointArray = new byte[minLength + 1];
517        ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, right, rightOffset, 0,
518          minLength + 1);
519        minimumMidpointArray[minLength] = 0x00;
520        return minimumMidpointArray;
521      } else {
522        // left == right
523        return null;
524      }
525    }
526    // Note that left[diffIdx] can never be equal to 0xff since left < right
527    byte[] minimumMidpointArray = new byte[diffIdx + 1];
528    ByteBufferUtils.copyFromBufferToArray(minimumMidpointArray, left, leftOffset, 0, diffIdx + 1);
529    minimumMidpointArray[diffIdx] = (byte) (minimumMidpointArray[diffIdx] + 1);
530    return minimumMidpointArray;
531  }
532
533  /** Gives inline block writers an opportunity to contribute blocks. */
534  private void writeInlineBlocks(boolean closing) throws IOException {
535    for (InlineBlockWriter ibw : inlineBlockWriters) {
536      while (ibw.shouldWriteBlock(closing)) {
537        long offset = outputStream.getPos();
538        boolean cacheThisBlock = ibw.getCacheOnWrite();
539        ibw.writeInlineBlock(blockWriter.startWriting(ibw.getInlineBlockType()));
540        blockWriter.writeHeaderAndData(outputStream);
541        ibw.blockWritten(offset, blockWriter.getOnDiskSizeWithHeader(),
542          blockWriter.getUncompressedSizeWithoutHeader());
543        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
544
545        if (cacheThisBlock) {
546          doCacheOnWrite(offset);
547        }
548      }
549    }
550  }
551
552  /**
553   * Caches the last written HFile block.
554   * @param offset the offset of the block we want to cache. Used to determine the cache key.
555   */
556  private void doCacheOnWrite(long offset) {
557    cacheConf.getBlockCache().ifPresent(cache -> {
558      HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
559      BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
560      try {
561        cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
562      } finally {
563        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
564        cacheFormatBlock.release();
565      }
566    });
567  }
568
569  private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
570    if (path != null) {
571      return new BlockCacheKey(path, offset, true, blockType);
572    }
573    return new BlockCacheKey(name, offset, true, blockType);
574  }
575
576  /**
577   * Ready a new block for writing.
578   */
579  protected void newBlock() throws IOException {
580    // This is where the next block begins.
581    blockWriter.startWriting(BlockType.DATA);
582    firstCellInBlock = null;
583    if (lastCell != null) {
584      lastCellOfPreviousBlock = lastCell;
585    }
586  }
587
588  /**
589   * Add a meta block to the end of the file. Call before close(). Metadata blocks are expensive.
590   * Fill one with a bunch of serialized data rather than do a metadata block per metadata instance.
591   * If metadata is small, consider adding to file info using
592   * {@link #appendFileInfo(byte[], byte[])} name of the block will call readFields to get data
593   * later (DO NOT REUSE)
594   */
595  @Override
596  public void appendMetaBlock(String metaBlockName, Writable content) {
597    byte[] key = Bytes.toBytes(metaBlockName);
598    int i;
599    for (i = 0; i < metaNames.size(); ++i) {
600      // stop when the current key is greater than our own
601      byte[] cur = metaNames.get(i);
602      if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length, key, 0, key.length) > 0) {
603        break;
604      }
605    }
606    metaNames.add(i, key);
607    metaData.add(i, content);
608  }
609
610  @Override
611  public void close() throws IOException {
612    if (outputStream == null) {
613      return;
614    }
615    // Save data block encoder metadata in the file info.
616    blockEncoder.saveMetadata(this);
617    // Save index block encoder metadata in the file info.
618    indexBlockEncoder.saveMetadata(this);
619    // Write out the end of the data blocks, then write meta data blocks.
620    // followed by fileinfo, data block index and meta block index.
621
622    finishBlock();
623    writeInlineBlocks(true);
624
625    FixedFileTrailer trailer = new FixedFileTrailer(getMajorVersion(), getMinorVersion());
626
627    // Write out the metadata blocks if any.
628    if (!metaNames.isEmpty()) {
629      for (int i = 0; i < metaNames.size(); ++i) {
630        // store the beginning offset
631        long offset = outputStream.getPos();
632        // write the metadata content
633        DataOutputStream dos = blockWriter.startWriting(BlockType.META);
634        metaData.get(i).write(dos);
635
636        blockWriter.writeHeaderAndData(outputStream);
637        totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
638
639        // Add the new meta block to the meta index.
640        metaBlockIndexWriter.addEntry(metaNames.get(i), offset,
641          blockWriter.getOnDiskSizeWithHeader());
642      }
643    }
644
645    // Load-on-open section.
646
647    // Data block index.
648    //
649    // In version 2, this section of the file starts with the root level data
650    // block index. We call a function that writes intermediate-level blocks
651    // first, then root level, and returns the offset of the root level block
652    // index.
653
654    long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);
655    trailer.setLoadOnOpenOffset(rootIndexOffset);
656
657    // Meta block index.
658    metaBlockIndexWriter.writeSingleLevelIndex(blockWriter.startWriting(BlockType.ROOT_INDEX),
659      "meta");
660    blockWriter.writeHeaderAndData(outputStream);
661    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
662
663    if (this.hFileContext.isIncludesMvcc()) {
664      appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));
665      appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));
666    }
667
668    // File info
669    writeFileInfo(trailer, blockWriter.startWriting(BlockType.FILE_INFO));
670    blockWriter.writeHeaderAndData(outputStream);
671    totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
672
673    // Load-on-open data supplied by higher levels, e.g. Bloom filters.
674    for (BlockWritable w : additionalLoadOnOpenData) {
675      blockWriter.writeBlock(w, outputStream);
676      totalUncompressedBytes += blockWriter.getUncompressedSizeWithHeader();
677    }
678
679    // Now finish off the trailer.
680    trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());
681    trailer.setUncompressedDataIndexSize(dataBlockIndexWriter.getTotalUncompressedSize());
682    trailer.setFirstDataBlockOffset(firstDataBlockOffset);
683    trailer.setLastDataBlockOffset(lastDataBlockOffset);
684    trailer.setComparatorClass(this.hFileContext.getCellComparator().getClass());
685    trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());
686
687    finishClose(trailer);
688
689    blockWriter.release();
690  }
691
692  @Override
693  public void addInlineBlockWriter(InlineBlockWriter ibw) {
694    inlineBlockWriters.add(ibw);
695  }
696
697  @Override
698  public void addGeneralBloomFilter(final BloomFilterWriter bfw) {
699    this.addBloomFilter(bfw, BlockType.GENERAL_BLOOM_META);
700  }
701
702  @Override
703  public void addDeleteFamilyBloomFilter(final BloomFilterWriter bfw) {
704    this.addBloomFilter(bfw, BlockType.DELETE_FAMILY_BLOOM_META);
705  }
706
707  private void addBloomFilter(final BloomFilterWriter bfw, final BlockType blockType) {
708    if (bfw.getKeyCount() <= 0) {
709      return;
710    }
711
712    if (
713      blockType != BlockType.GENERAL_BLOOM_META && blockType != BlockType.DELETE_FAMILY_BLOOM_META
714    ) {
715      throw new RuntimeException("Block Type: " + blockType.toString() + "is not supported");
716    }
717    additionalLoadOnOpenData.add(new BlockWritable() {
718      @Override
719      public BlockType getBlockType() {
720        return blockType;
721      }
722
723      @Override
724      public void writeToBlock(DataOutput out) throws IOException {
725        bfw.getMetaWriter().write(out);
726        Writable dataWriter = bfw.getDataWriter();
727        if (dataWriter != null) {
728          dataWriter.write(out);
729        }
730      }
731    });
732  }
733
734  @Override
735  public HFileContext getFileContext() {
736    return hFileContext;
737  }
738
739  /**
740   * Add key/value to file. Keys must be added in an order that agrees with the Comparator passed on
741   * construction. Cell to add. Cannot be empty nor null.
742   */
743  @Override
744  public void append(final ExtendedCell cell) throws IOException {
745    // checkKey uses comparator to check we are writing in order.
746    boolean dupKey = checkKey(cell);
747    if (!dupKey) {
748      checkBlockBoundary();
749    }
750
751    if (!blockWriter.isWriting()) {
752      newBlock();
753    }
754
755    blockWriter.write(cell);
756
757    totalKeyLength += PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
758    totalValueLength += cell.getValueLength();
759    if (lenOfBiggestCell < PrivateCellUtil.estimatedSerializedSizeOf(cell)) {
760      lenOfBiggestCell = PrivateCellUtil.estimatedSerializedSizeOf(cell);
761      keyOfBiggestCell = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(cell);
762    }
763    // Are we the first key in this block?
764    if (firstCellInBlock == null) {
765      // If cell is big, block will be closed and this firstCellInBlock reference will only last
766      // a short while.
767      firstCellInBlock = cell;
768    }
769
770    // TODO: What if cell is 10MB and we write infrequently? We hold on to cell here indefinitely?
771    lastCell = cell;
772    entryCount++;
773    this.maxMemstoreTS = Math.max(this.maxMemstoreTS, cell.getSequenceId());
774    int tagsLength = cell.getTagsLength();
775    if (tagsLength > this.maxTagsLength) {
776      this.maxTagsLength = tagsLength;
777    }
778  }
779
780  @Override
781  public void beforeShipped() throws IOException {
782    this.blockWriter.beforeShipped();
783    // Add clone methods for every cell
784    if (this.lastCell != null) {
785      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
786    }
787    if (this.firstCellInBlock != null) {
788      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
789    }
790    if (this.lastCellOfPreviousBlock != null) {
791      this.lastCellOfPreviousBlock = KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
792    }
793  }
794
795  public Cell getLastCell() {
796    return lastCell;
797  }
798
799  protected void finishFileInfo() throws IOException {
800    if (lastCell != null) {
801      // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean
802      // byte buffer. Won't take a tuple.
803      byte[] lastKey = PrivateCellUtil.getCellKeySerializedAsKeyValueKey(this.lastCell);
804      fileInfo.append(HFileInfo.LASTKEY, lastKey, false);
805    }
806
807    // Average key length.
808    int avgKeyLen = entryCount == 0 ? 0 : (int) (totalKeyLength / entryCount);
809    fileInfo.append(HFileInfo.AVG_KEY_LEN, Bytes.toBytes(avgKeyLen), false);
810    fileInfo.append(HFileInfo.CREATE_TIME_TS, Bytes.toBytes(hFileContext.getFileCreateTime()),
811      false);
812
813    // Average value length.
814    int avgValueLen = entryCount == 0 ? 0 : (int) (totalValueLength / entryCount);
815    fileInfo.append(HFileInfo.AVG_VALUE_LEN, Bytes.toBytes(avgValueLen), false);
816
817    // Biggest cell.
818    if (keyOfBiggestCell != null) {
819      fileInfo.append(HFileInfo.KEY_OF_BIGGEST_CELL, keyOfBiggestCell, false);
820      fileInfo.append(HFileInfo.LEN_OF_BIGGEST_CELL, Bytes.toBytes(lenOfBiggestCell), false);
821      LOG.debug("Len of the biggest cell in {} is {}, key is {}",
822        this.getPath() == null ? "" : this.getPath().toString(), lenOfBiggestCell,
823        CellUtil.toString(new KeyValue.KeyOnlyKeyValue(keyOfBiggestCell), false));
824    }
825
826    if (hFileContext.isIncludesTags()) {
827      // When tags are not being written in this file, MAX_TAGS_LEN is excluded
828      // from the FileInfo
829      fileInfo.append(HFileInfo.MAX_TAGS_LEN, Bytes.toBytes(this.maxTagsLength), false);
830      boolean tagsCompressed = (hFileContext.getDataBlockEncoding() != DataBlockEncoding.NONE)
831        && hFileContext.isCompressTags();
832      fileInfo.append(HFileInfo.TAGS_COMPRESSED, Bytes.toBytes(tagsCompressed), false);
833    }
834  }
835
836  protected int getMajorVersion() {
837    return 3;
838  }
839
840  protected int getMinorVersion() {
841    return HFileReaderImpl.MAX_MINOR_VERSION;
842  }
843
844  protected void finishClose(FixedFileTrailer trailer) throws IOException {
845    // Write out encryption metadata before finalizing if we have a valid crypto context
846    Encryption.Context cryptoContext = hFileContext.getEncryptionContext();
847    if (cryptoContext != Encryption.Context.NONE) {
848      // Wrap the context's key and write it as the encryption metadata, the wrapper includes
849      // all information needed for decryption
850      trailer.setEncryptionKey(EncryptionUtil.wrapKey(
851        cryptoContext.getConf(), cryptoContext.getConf()
852          .get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()),
853        cryptoContext.getKey()));
854    }
855    // Now we can finish the close
856    trailer.setMetaIndexCount(metaNames.size());
857    trailer.setTotalUncompressedBytes(totalUncompressedBytes + trailer.getTrailerSize());
858    trailer.setEntryCount(entryCount);
859    trailer.setCompressionCodec(hFileContext.getCompression());
860
861    long startTime = EnvironmentEdgeManager.currentTime();
862    trailer.serialize(outputStream);
863    HFile.updateWriteLatency(EnvironmentEdgeManager.currentTime() - startTime);
864
865    if (closeOutputStream) {
866      outputStream.close();
867      outputStream = null;
868    }
869  }
870}