001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.io.HeapSize;
042import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
043import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
044import org.apache.hadoop.hbase.nio.ByteBuff;
045import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.ClassSize;
048import org.apache.hadoop.hbase.util.ObjectIntPair;
049import org.apache.hadoop.io.WritableUtils;
050import org.apache.hadoop.util.StringUtils;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Provides functionality to write ({@link BlockIndexWriter}) and read BlockIndexReader single-level
057 * and multi-level block indexes. Examples of how to use the block index writer can be found in
058 * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and {@link HFileWriterImpl}.
059 * Examples of how to use the reader can be found in {@link HFileReaderImpl} and
060 * org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex.
061 */
062@InterfaceAudience.Private
063public class HFileBlockIndex {
064
065  private static final Logger LOG = LoggerFactory.getLogger(HFileBlockIndex.class);
066
067  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
068
069  /**
070   * The maximum size guideline for index blocks (both leaf, intermediate, and root). If not
071   * specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
072   */
073  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
074
075  /**
076   * Minimum number of entries in a single index block. Even if we are above the
077   * hfile.index.block.max.size we will keep writing to the same block unless we have that many
078   * entries. We should have at least a few entries so that we don't have too many levels in the
079   * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
080   */
081  public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
082
083  static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
084
085  /**
086   * The number of bytes stored in each "secondary index" entry in addition to key bytes in the
087   * non-root index block format. The first long is the file offset of the deeper-level block the
088   * entry points to, and the int that follows is that block's on-disk size without including
089   * header.
090   */
091  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
092
093  /**
094   * Error message when trying to use inline block API in single-level mode.
095   */
096  private static final String INLINE_BLOCKS_NOT_ALLOWED =
097    "Inline blocks are not allowed in the single-level-only mode";
098
099  /**
100   * The size of a meta-data record used for finding the mid-key in a multi-level index. Consists of
101   * the middle leaf-level index block offset (long), its on-disk size without header included
102   * (int), and the mid-key entry's zero-based index in that leaf index block.
103   */
104  protected static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG + 2 * Bytes.SIZEOF_INT;
105
106  /**
107   * An implementation of the BlockIndexReader that deals with block keys which are plain byte[]
108   * like MetaBlock or the Bloom Block for ROW bloom. Does not need a comparator. It can work on
109   * Bytes.BYTES_RAWCOMPARATOR
110   */
111  static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
112
113    private byte[][] blockKeys;
114
115    public ByteArrayKeyBlockIndexReader(final int treeLevel) {
116      // Can be null for METAINDEX block
117      searchTreeLevel = treeLevel;
118    }
119
120    @Override
121    protected long calculateHeapSizeForBlockKeys(long heapSize) {
122      // Calculating the size of blockKeys
123      if (blockKeys != null) {
124        heapSize += ClassSize.REFERENCE;
125        // Adding array + references overhead
126        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
127
128        // Adding bytes
129        for (byte[] key : blockKeys) {
130          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
131        }
132      }
133      return heapSize;
134    }
135
136    @Override
137    public boolean isEmpty() {
138      return blockKeys.length == 0;
139    }
140
141    /**
142     * from 0 to {@link #getRootBlockCount() - 1}
143     */
144    public byte[] getRootBlockKey(int i) {
145      return blockKeys[i];
146    }
147
148    @Override
149    public BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key, HFileBlock currentBlock,
150      boolean cacheBlocks, boolean pread, boolean isCompaction,
151      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
152      throws IOException {
153      // this would not be needed
154      return null;
155    }
156
157    @Override
158    public Cell midkey(CachingBlockReader cachingBlockReader) throws IOException {
159      // Not needed here
160      return null;
161    }
162
163    @Override
164    protected void initialize(int numEntries) {
165      blockKeys = new byte[numEntries][];
166    }
167
168    @Override
169    protected void add(final byte[] key, final long offset, final int dataSize) {
170      blockOffsets[rootCount] = offset;
171      blockKeys[rootCount] = key;
172      blockDataSizes[rootCount] = dataSize;
173      rootCount++;
174    }
175
176    @Override
177    public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
178      int pos = Bytes.binarySearch(blockKeys, key, offset, length);
179      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
180      // binarySearch's javadoc.
181
182      if (pos >= 0) {
183        // This means this is an exact match with an element of blockKeys.
184        assert pos < blockKeys.length;
185        return pos;
186      }
187
188      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
189      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
190      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
191      // key < blockKeys[0], meaning the file does not contain the given key.
192
193      int i = -pos - 1;
194      assert 0 <= i && i <= blockKeys.length;
195      return i - 1;
196    }
197
198    @Override
199    public int rootBlockContainingKey(Cell key) {
200      // Should not be called on this because here it deals only with byte[]
201      throw new UnsupportedOperationException(
202        "Cannot search for a key that is of Cell type. Only plain byte array keys "
203          + "can be searched for");
204    }
205
206    @Override
207    public String toString() {
208      StringBuilder sb = new StringBuilder();
209      sb.append("size=" + rootCount).append("\n");
210      for (int i = 0; i < rootCount; i++) {
211        sb.append("key=").append(KeyValue.keyToString(blockKeys[i])).append("\n  offset=")
212          .append(blockOffsets[i]).append(", dataSize=" + blockDataSizes[i]).append("\n");
213      }
214      return sb.toString();
215    }
216  }
217
218  /**
219   * An implementation of the BlockIndexReader that deals with block keys which are the key part of
220   * a cell like the Data block index or the ROW_COL bloom blocks This needs a comparator to work
221   * with the Cells
222   */
223  static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
224
225    private ExtendedCell[] blockKeys;
226    /** Pre-computed mid-key */
227    private AtomicReference<ExtendedCell> midKey = new AtomicReference<>();
228    /** Needed doing lookup on blocks. */
229    protected CellComparator comparator;
230
231    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
232      // Can be null for METAINDEX block
233      comparator = c;
234      searchTreeLevel = treeLevel;
235    }
236
237    @Override
238    protected long calculateHeapSizeForBlockKeys(long heapSize) {
239      if (blockKeys != null) {
240        heapSize += ClassSize.REFERENCE;
241        // Adding array + references overhead
242        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
243
244        // Adding blockKeys
245        for (Cell key : blockKeys) {
246          heapSize += ClassSize.align(key.heapSize());
247        }
248      }
249      // Add comparator and the midkey atomicreference
250      heapSize += 2 * ClassSize.REFERENCE;
251      return heapSize;
252    }
253
254    @Override
255    public boolean isEmpty() {
256      return blockKeys.length == 0;
257    }
258
259    /**
260     * from 0 to {@link #getRootBlockCount() - 1}
261     */
262    public ExtendedCell getRootBlockKey(int i) {
263      return blockKeys[i];
264    }
265
266    @Override
267    public BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key, HFileBlock currentBlock,
268      boolean cacheBlocks, boolean pread, boolean isCompaction,
269      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
270      throws IOException {
271      int rootLevelIndex = rootBlockContainingKey(key);
272      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
273        return null;
274      }
275
276      // the next indexed key
277      ExtendedCell nextIndexedKey = null;
278
279      // Read the next-level (intermediate or leaf) index block.
280      long currentOffset = blockOffsets[rootLevelIndex];
281      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
282
283      if (rootLevelIndex < blockKeys.length - 1) {
284        nextIndexedKey = blockKeys[rootLevelIndex + 1];
285      } else {
286        nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
287      }
288
289      int lookupLevel = 1; // How many levels deep we are in our lookup.
290      int index = -1;
291
292      HFileBlock block = null;
293      KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
294      while (true) {
295        try {
296          // Must initialize it with null here, because if don't and once an exception happen in
297          // readBlock, then we'll release the previous assigned block twice in the finally block.
298          // (See HBASE-22422)
299          block = null;
300          if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
301            // Avoid reading the same block again, even with caching turned off.
302            // This is crucial for compaction-type workload which might have
303            // caching turned off. This is like a one-block cache inside the
304            // scanner.
305            block = currentBlock;
306          } else {
307            // Call HFile's caching block reader API. We always cache index
308            // blocks, otherwise we might get terrible performance.
309            boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
310            BlockType expectedBlockType;
311            if (lookupLevel < searchTreeLevel - 1) {
312              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
313            } else if (lookupLevel == searchTreeLevel - 1) {
314              expectedBlockType = BlockType.LEAF_INDEX;
315            } else {
316              // this also accounts for ENCODED_DATA
317              expectedBlockType = BlockType.DATA;
318            }
319            block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
320              pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
321          }
322
323          if (block == null) {
324            throw new IOException("Failed to read block at offset " + currentOffset
325              + ", onDiskSize=" + currentOnDiskSize);
326          }
327
328          // Found a data block, break the loop and check our level in the tree.
329          if (block.getBlockType().isData()) {
330            break;
331          }
332
333          // Not a data block. This must be a leaf-level or intermediate-level
334          // index block. We don't allow going deeper than searchTreeLevel.
335          if (++lookupLevel > searchTreeLevel) {
336            throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
337              + ", searchTreeLevel=" + searchTreeLevel);
338          }
339
340          // Locate the entry corresponding to the given key in the non-root
341          // (leaf or intermediate-level) index block.
342          ByteBuff buffer = block.getBufferWithoutHeader();
343          index = locateNonRootIndexEntry(buffer, key, comparator);
344          if (index == -1) {
345            // This has to be changed
346            // For now change this to key value
347            throw new IOException("The key " + CellUtil.getCellKeyAsString(key) + " is before the"
348              + " first key of the non-root index block " + block);
349          }
350
351          currentOffset = buffer.getLong();
352          currentOnDiskSize = buffer.getInt();
353
354          // Only update next indexed key if there is a next indexed key in the current level
355          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
356          if (nonRootIndexedKey != null) {
357            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
358            nextIndexedKey = tmpNextIndexKV;
359          }
360        } finally {
361          if (block != null && !block.getBlockType().isData()) {
362            // Release the block immediately if it is not the data block
363            block.release();
364          }
365        }
366      }
367
368      if (lookupLevel != searchTreeLevel) {
369        assert block.getBlockType().isData();
370        // Though we have retrieved a data block we have found an issue
371        // in the retrieved data block. Hence returned the block so that
372        // the ref count can be decremented
373        if (block != null) {
374          block.release();
375        }
376        throw new IOException("Reached a data block at level " + lookupLevel
377          + " but the number of levels is " + searchTreeLevel);
378      }
379
380      // set the next indexed key for the current block.
381      return new BlockWithScanInfo(block, nextIndexedKey);
382    }
383
384    @Override
385    public ExtendedCell midkey(CachingBlockReader cachingBlockReader) throws IOException {
386      if (rootCount == 0) {
387        throw new IOException("HFile empty");
388      }
389
390      ExtendedCell targetMidKey = this.midKey.get();
391      if (targetMidKey != null) {
392        return targetMidKey;
393      }
394
395      if (midLeafBlockOffset >= 0) {
396        if (cachingBlockReader == null) {
397          throw new IOException(
398            "Have to read the middle leaf block but " + "no block reader available");
399        }
400
401        // Caching, using pread, assuming this is not a compaction.
402        HFileBlock midLeafBlock = cachingBlockReader.readBlock(midLeafBlockOffset,
403          midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX, null);
404        try {
405          byte[] bytes = getNonRootIndexedKey(midLeafBlock.getBufferWithoutHeader(), midKeyEntry);
406          assert bytes != null;
407          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
408        } finally {
409          midLeafBlock.release();
410        }
411      } else {
412        // The middle of the root-level index.
413        targetMidKey = blockKeys[rootCount / 2];
414      }
415
416      this.midKey.set(targetMidKey);
417      return targetMidKey;
418    }
419
420    @Override
421    protected void initialize(int numEntries) {
422      blockKeys = new ExtendedCell[numEntries];
423    }
424
425    /**
426     * Adds a new entry in the root block index. Only used when reading.
427     * @param key      Last key in the block
428     * @param offset   file offset where the block is stored
429     * @param dataSize the uncompressed data size
430     */
431    @Override
432    protected void add(final byte[] key, final long offset, final int dataSize) {
433      blockOffsets[rootCount] = offset;
434      // Create the blockKeys as Cells once when the reader is opened
435      blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
436      blockDataSizes[rootCount] = dataSize;
437      rootCount++;
438    }
439
440    @Override
441    public int rootBlockContainingKey(final byte[] key, int offset, int length,
442      CellComparator comp) {
443      // This should always be called with Cell not with a byte[] key
444      throw new UnsupportedOperationException("Cannot find for a key containing plain byte "
445        + "array. Only cell based keys can be searched for");
446    }
447
448    @Override
449    public int rootBlockContainingKey(Cell key) {
450      // Here the comparator should not be null as this happens for the root-level block
451      int pos = Bytes.binarySearch(blockKeys, key, comparator);
452      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
453      // binarySearch's javadoc.
454
455      if (pos >= 0) {
456        // This means this is an exact match with an element of blockKeys.
457        assert pos < blockKeys.length;
458        return pos;
459      }
460
461      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
462      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
463      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
464      // key < blockKeys[0], meaning the file does not contain the given key.
465
466      int i = -pos - 1;
467      assert 0 <= i && i <= blockKeys.length;
468      return i - 1;
469    }
470
471    @Override
472    public String toString() {
473      StringBuilder sb = new StringBuilder();
474      sb.append("size=" + rootCount).append("\n");
475      for (int i = 0; i < rootCount; i++) {
476        sb.append("key=").append((blockKeys[i])).append("\n  offset=").append(blockOffsets[i])
477          .append(", dataSize=" + blockDataSizes[i]).append("\n");
478      }
479      return sb.toString();
480    }
481  }
482
483  static class CellBasedKeyBlockIndexReaderV2 extends CellBasedKeyBlockIndexReader {
484
485    private HFileIndexBlockEncoder indexBlockEncoder;
486
487    private HFileIndexBlockEncoder.EncodedSeeker seeker;
488
489    public CellBasedKeyBlockIndexReaderV2(final CellComparator c, final int treeLevel) {
490      this(c, treeLevel, null);
491    }
492
493    public CellBasedKeyBlockIndexReaderV2(final CellComparator c, final int treeLevel,
494      HFileIndexBlockEncoder indexBlockEncoder) {
495      super(c, treeLevel);
496      // Can be null for METAINDEX block
497      this.indexBlockEncoder =
498        indexBlockEncoder != null ? indexBlockEncoder : NoOpIndexBlockEncoder.INSTANCE;
499    }
500
501    @Override
502    public boolean isEmpty() {
503      return seeker.isEmpty();
504    }
505
506    @Override
507    public BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key, HFileBlock currentBlock,
508      boolean cacheBlocks, boolean pread, boolean isCompaction,
509      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
510      throws IOException {
511      return seeker.loadDataBlockWithScanInfo(key, currentBlock, cacheBlocks, pread, isCompaction,
512        expectedDataBlockEncoding, cachingBlockReader);
513    }
514
515    @Override
516    public ExtendedCell midkey(CachingBlockReader cachingBlockReader) throws IOException {
517      return seeker.midkey(cachingBlockReader);
518    }
519
520    /**
521     * from 0 to {@link #getRootBlockCount() - 1}
522     */
523    public ExtendedCell getRootBlockKey(int i) {
524      return seeker.getRootBlockKey(i);
525    }
526
527    @Override
528    public int getRootBlockCount() {
529      return seeker.getRootBlockCount();
530    }
531
532    @Override
533    public int rootBlockContainingKey(Cell key) {
534      return seeker.rootBlockContainingKey(key);
535    }
536
537    @Override
538    protected long calculateHeapSizeForBlockKeys(long heapSize) {
539      heapSize = super.calculateHeapSizeForBlockKeys(heapSize);
540      if (seeker != null) {
541        heapSize += ClassSize.REFERENCE;
542        heapSize += ClassSize.align(seeker.heapSize());
543      }
544      return heapSize;
545    }
546
547    @Override
548    public void readMultiLevelIndexRoot(HFileBlock blk, final int numEntries) throws IOException {
549      seeker = indexBlockEncoder.createSeeker();
550      seeker.initRootIndex(blk, numEntries, comparator, searchTreeLevel);
551    }
552
553    @Override
554    public String toString() {
555      return seeker.toString();
556    }
557  }
558
559  /**
560   * The reader will always hold the root level index in the memory. Index blocks at all other
561   * levels will be cached in the LRU cache in practice, although this API does not enforce that.
562   * <p>
563   * All non-root (leaf and intermediate) index blocks contain what we call a "secondary index": an
564   * array of offsets to the entries within the block. This allows us to do binary search for the
565   * entry corresponding to the given key without having to deserialize the block.
566   */
567  static abstract class BlockIndexReader implements HeapSize {
568
569    protected long[] blockOffsets;
570    protected int[] blockDataSizes;
571    protected int rootCount = 0;
572
573    // Mid-key metadata.
574    protected long midLeafBlockOffset = -1;
575    protected int midLeafBlockOnDiskSize = -1;
576    protected int midKeyEntry = -1;
577
578    /**
579     * The number of levels in the block index tree. One if there is only root level, two for root
580     * and leaf levels, etc.
581     */
582    protected int searchTreeLevel;
583
584    /** Returns true if the block index is empty. */
585    public abstract boolean isEmpty();
586
587    /**
588     * Verifies that the block index is non-empty and throws an {@link IllegalStateException}
589     * otherwise.
590     */
591    public void ensureNonEmpty() {
592      if (isEmpty()) {
593        throw new IllegalStateException("Block index is empty or not loaded");
594      }
595    }
596
597    /**
598     * Return the data block which contains this key. This function will only be called when the
599     * HFile version is larger than 1.
600     * @param key                       the key we are looking for
601     * @param currentBlock              the current block, to avoid re-reading the same block
602     * @param expectedDataBlockEncoding the data block encoding the caller is expecting the data
603     *                                  block to be in, or null to not perform this check and return
604     *                                  the block irrespective of the encoding
605     * @return reader a basic way to load blocks
606     */
607    public HFileBlock seekToDataBlock(final ExtendedCell key, HFileBlock currentBlock,
608      boolean cacheBlocks, boolean pread, boolean isCompaction,
609      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
610      throws IOException {
611      BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
612        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding, cachingBlockReader);
613      if (blockWithScanInfo == null) {
614        return null;
615      } else {
616        return blockWithScanInfo.getHFileBlock();
617      }
618    }
619
620    /**
621     * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with other
622     * scan info such as the key that starts the next HFileBlock. This function will only be called
623     * when the HFile version is larger than 1.
624     * @param key                       the key we are looking for
625     * @param currentBlock              the current block, to avoid re-reading the same block
626     * @param expectedDataBlockEncoding the data block encoding the caller is expecting the data
627     *                                  block to be in, or null to not perform this check and return
628     *                                  the block irrespective of the encoding.
629     * @return the BlockWithScanInfo which contains the DataBlock with other scan info such as
630     *         nextIndexedKey.
631     */
632    public abstract BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key,
633      HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction,
634      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
635      throws IOException;
636
637    /**
638     * An approximation to the {@link HFile}'s mid-key. Operates on block boundaries, and does not
639     * go inside blocks. In other words, returns the first key of the middle block of the file.
640     * @return the first key of the middle block
641     */
642    public abstract Cell midkey(CachingBlockReader cachingBlockReader) throws IOException;
643
644    /**
645     * @param i from 0 to {@link #getRootBlockCount() - 1}
646     */
647    public long getRootBlockOffset(int i) {
648      return blockOffsets[i];
649    }
650
651    /**
652     * @param i zero-based index of a root-level block
653     * @return the on-disk size of the root-level block for version 2, or the uncompressed size for
654     *         version 1
655     */
656    public int getRootBlockDataSize(int i) {
657      return blockDataSizes[i];
658    }
659
660    /** Returns the number of root-level blocks in this block index */
661    public int getRootBlockCount() {
662      return rootCount;
663    }
664
665    /**
666     * Finds the root-level index block containing the given key. Key to find the comparator to be
667     * used
668     * @return Offset of block containing <code>key</code> (between 0 and the number of blocks - 1)
669     *         or -1 if this file does not contain the request.
670     */
671    // When we want to find the meta index block or bloom block for ROW bloom
672    // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
673    // CellComparator.
674    public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
675      CellComparator comp);
676
677    /**
678     * Finds the root-level index block containing the given key. Key to find
679     * @return Offset of block containing <code>key</code> (between 0 and the number of blocks - 1)
680     *         or -1 if this file does not contain the request.
681     */
682    // When we want to find the meta index block or bloom block for ROW bloom
683    // type
684    // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
685    // need the CellComparator.
686    public int rootBlockContainingKey(final byte[] key, int offset, int length) {
687      return rootBlockContainingKey(key, offset, length, null);
688    }
689
690    /**
691     * Finds the root-level index block containing the given key. Key to find
692     */
693    public abstract int rootBlockContainingKey(final Cell key);
694
695    /**
696     * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
697     * @param i the ith position
698     * @return The indexed key at the ith position in the nonRootIndex.
699     */
700    static byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
701      int numEntries = nonRootIndex.getInt(0);
702      if (i < 0 || i >= numEntries) {
703        return null;
704      }
705
706      // Entries start after the number of entries and the secondary index.
707      // The secondary index takes numEntries + 1 ints.
708      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
709      // Targetkey's offset relative to the end of secondary index
710      int targetKeyRelOffset = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 1));
711
712      // The offset of the target key in the blockIndex buffer
713      int targetKeyOffset = entriesOffset // Skip secondary index
714        + targetKeyRelOffset // Skip all entries until mid
715        + SECONDARY_INDEX_ENTRY_OVERHEAD; // Skip offset and on-disk-size
716
717      // We subtract the two consecutive secondary index elements, which
718      // gives us the size of the whole (offset, onDiskSize, key) tuple. We
719      // then need to subtract the overhead of offset and onDiskSize.
720      int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) - targetKeyRelOffset
721        - SECONDARY_INDEX_ENTRY_OVERHEAD;
722
723      // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
724      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
725    }
726
727    /**
728     * Performs a binary search over a non-root level index block. Utilizes the secondary index,
729     * which records the offsets of (offset, onDiskSize, firstKey) tuples of all entries. the key we
730     * are searching for offsets to individual entries in the blockIndex buffer the non-root index
731     * block buffer, starting with the secondary index. The position is ignored.
732     * @return the index i in [0, numEntries - 1] such that keys[i] <= key < keys[i + 1], if keys is
733     *         the array of all keys being searched, or -1 otherwise
734     */
735    static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
736      CellComparator comparator) {
737
738      int numEntries = nonRootIndex.getIntAfterPosition(0);
739      int low = 0;
740      int high = numEntries - 1;
741      int mid = 0;
742
743      // Entries start after the number of entries and the secondary index.
744      // The secondary index takes numEntries + 1 ints.
745      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
746
747      // If we imagine that keys[-1] = -Infinity and
748      // keys[numEntries] = Infinity, then we are maintaining an invariant that
749      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
750      ByteBufferKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferKeyOnlyKeyValue();
751      ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<>();
752      while (low <= high) {
753        mid = low + ((high - low) >> 1);
754
755        // Midkey's offset relative to the end of secondary index
756        int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
757
758        // The offset of the middle key in the blockIndex buffer
759        int midKeyOffset = entriesOffset // Skip secondary index
760          + midKeyRelOffset // Skip all entries until mid
761          + SECONDARY_INDEX_ENTRY_OVERHEAD; // Skip offset and on-disk-size
762
763        // We subtract the two consecutive secondary index elements, which
764        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
765        // then need to subtract the overhead of offset and onDiskSize.
766        int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2))
767          - midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
768
769        // we have to compare in this order, because the comparator order
770        // has special logic when the 'left side' is a special key.
771        // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
772        // done after HBASE-12224 & HBASE-12282
773        // TODO avoid array call.
774        nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
775        nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
776        int cmp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, nonRootIndexkeyOnlyKV);
777
778        // key lives above the midpoint
779        if (cmp > 0) low = mid + 1; // Maintain the invariant that keys[low - 1] < key
780        // key lives below the midpoint
781        else if (cmp < 0) high = mid - 1; // Maintain the invariant that key < keys[high + 1]
782        else return mid; // exact match
783      }
784
785      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
786      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
787      // condition, low >= high + 1. Therefore, low = high + 1.
788
789      if (low != high + 1) {
790        throw new IllegalStateException(
791          "Binary search broken: low=" + low + " " + "instead of " + (high + 1));
792      }
793
794      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
795      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
796      int i = low - 1;
797
798      // Some extra validation on the result.
799      if (i < -1 || i >= numEntries) {
800        throw new IllegalStateException("Binary search broken: result is " + i
801          + " but expected to be between -1 and (numEntries - 1) = " + (numEntries - 1));
802      }
803
804      return i;
805    }
806
807    /**
808     * Search for one key using the secondary index in a non-root block. In case of success,
809     * positions the provided buffer at the entry of interest, where the file offset and the
810     * on-disk-size can be read. a non-root block without header. Initial position does not matter.
811     * the byte array containing the key
812     * @return the index position where the given key was found, otherwise return -1 in the case the
813     *         given key is before the first key.
814     */
815    static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key, CellComparator comparator) {
816      int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
817
818      if (entryIndex != -1) {
819        int numEntries = nonRootBlock.getIntAfterPosition(0);
820
821        // The end of secondary index and the beginning of entries themselves.
822        int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
823
824        // The offset of the entry we are interested in relative to the end of
825        // the secondary index.
826        int entryRelOffset = nonRootBlock.getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
827
828        nonRootBlock.position(entriesOffset + entryRelOffset);
829      }
830
831      return entryIndex;
832    }
833
834    /**
835     * Read in the root-level index from the given input stream. Must match what was written into
836     * the root level by {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the offset
837     * that function returned.
838     * @param in         the buffered input stream or wrapped byte input stream
839     * @param numEntries the number of root-level index entries
840     */
841    public void readRootIndex(DataInput in, final int numEntries) throws IOException {
842      blockOffsets = new long[numEntries];
843      initialize(numEntries);
844      blockDataSizes = new int[numEntries];
845
846      // If index size is zero, no index was written.
847      if (numEntries > 0) {
848        for (int i = 0; i < numEntries; ++i) {
849          long offset = in.readLong();
850          int dataSize = in.readInt();
851          byte[] key = Bytes.readByteArray(in);
852          add(key, offset, dataSize);
853        }
854      }
855    }
856
857    protected abstract void initialize(int numEntries);
858
859    protected abstract void add(final byte[] key, final long offset, final int dataSize);
860
861    /**
862     * Read in the root-level index from the given input stream. Must match what was written into
863     * the root level by {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the offset
864     * that function returned.
865     * @param blk        the HFile block
866     * @param numEntries the number of root-level index entries
867     * @return the buffered input stream or wrapped byte input stream
868     */
869    public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
870      DataInputStream in = blk.getByteStream();
871      readRootIndex(in, numEntries);
872      return in;
873    }
874
875    /**
876     * Read the root-level metadata of a multi-level block index. Based on
877     * {@link #readRootIndex(DataInput, int)}, but also reads metadata necessary to compute the
878     * mid-key in a multi-level index.
879     * @param blk        the HFile block
880     * @param numEntries the number of root-level index entries
881     */
882    public void readMultiLevelIndexRoot(HFileBlock blk, final int numEntries) throws IOException {
883      DataInputStream in = readRootIndex(blk, numEntries);
884      // HFileBlock.getByteStream() returns a byte stream for reading the data(excluding checksum)
885      // of root index block, so after reading the root index there is no need to subtract the
886      // checksum bytes.
887      if (in.available() < MID_KEY_METADATA_SIZE) {
888        // No mid-key metadata available.
889        return;
890      }
891      midLeafBlockOffset = in.readLong();
892      midLeafBlockOnDiskSize = in.readInt();
893      midKeyEntry = in.readInt();
894    }
895
896    @Override
897    public long heapSize() {
898      // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
899      long heapSize =
900        ClassSize.align(3 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
901
902      // Mid-key metadata.
903      heapSize += MID_KEY_METADATA_SIZE;
904
905      heapSize = calculateHeapSizeForBlockKeys(heapSize);
906
907      if (blockOffsets != null) {
908        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length * Bytes.SIZEOF_LONG);
909      }
910
911      if (blockDataSizes != null) {
912        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length * Bytes.SIZEOF_INT);
913      }
914
915      return ClassSize.align(heapSize);
916    }
917
918    protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
919  }
920
921  /**
922   * Writes the block index into the output stream. Generate the tree from bottom up. The leaf level
923   * is written to disk as a sequence of inline blocks, if it is larger than a certain number of
924   * bytes. If the leaf level is not large enough, we write all entries to the root level instead.
925   * After all leaf blocks have been written, we end up with an index referencing the resulting leaf
926   * index blocks. If that index is larger than the allowed root index size, the writer will break
927   * it up into reasonable-size intermediate-level index block chunks write those chunks out, and
928   * create another index referencing those chunks. This will be repeated until the remaining index
929   * is small enough to become the root index. However, in most practical cases we will only have
930   * leaf-level blocks and the root index, or just the root index.
931   */
932  public static class BlockIndexWriter implements InlineBlockWriter {
933    /**
934     * While the index is being written, this represents the current block index referencing all
935     * leaf blocks, with one exception. If the file is being closed and there are not enough blocks
936     * to complete even a single leaf block, no leaf blocks get written and this contains the entire
937     * block index. After all levels of the index were written by
938     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final root-level index.
939     */
940    private BlockIndexChunk rootChunk = new BlockIndexChunkImpl();
941
942    /**
943     * Current leaf-level chunk. New entries referencing data blocks get added to this chunk until
944     * it grows large enough to be written to disk.
945     */
946    private BlockIndexChunk curInlineChunk = new BlockIndexChunkImpl();
947
948    /**
949     * The number of block index levels. This is one if there is only root level (even empty), two
950     * if there a leaf level and root level, and is higher if there are intermediate levels. This is
951     * only final after {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The initial
952     * value accounts for the root level, and will be increased to two as soon as we find out there
953     * is a leaf-level in {@link #blockWritten(long, int, int)}.
954     */
955    private int numLevels = 1;
956
957    private HFileBlock.Writer blockWriter;
958    private byte[] firstKey = null;
959
960    /**
961     * The total number of leaf-level entries, i.e. entries referenced by leaf-level blocks. For the
962     * data block index this is equal to the number of data blocks.
963     */
964    private long totalNumEntries;
965
966    /** Total compressed size of all index blocks. */
967    private long totalBlockOnDiskSize;
968
969    /** Total uncompressed size of all index blocks. */
970    private long totalBlockUncompressedSize;
971
972    /** The maximum size guideline of all multi-level index blocks. */
973    private int maxChunkSize;
974
975    /** The maximum level of multi-level index blocks */
976    private int minIndexNumEntries;
977
978    /** Whether we require this block index to always be single-level. */
979    private boolean singleLevelOnly;
980
981    /** CacheConfig, or null if cache-on-write is disabled */
982    private CacheConfig cacheConf;
983
984    /** Name to use for computing cache keys */
985    private String nameForCaching;
986
987    /** Type of encoding used for index blocks in HFile */
988    private HFileIndexBlockEncoder indexBlockEncoder;
989
990    /** Creates a single-level block index writer */
991    public BlockIndexWriter() {
992      this(null, null, null, null);
993      singleLevelOnly = true;
994    }
995
996    /**
997     * Creates a multi-level block index writer.
998     * @param blockWriter the block writer to use to write index blocks
999     * @param cacheConf   used to determine when and how a block should be cached-on-write.
1000     */
1001    public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig cacheConf,
1002      String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
1003      if ((cacheConf == null) != (nameForCaching == null)) {
1004        throw new IllegalArgumentException(
1005          "Block cache and file name for " + "caching must be both specified or both null");
1006      }
1007
1008      this.blockWriter = blockWriter;
1009      this.cacheConf = cacheConf;
1010      this.nameForCaching = nameForCaching;
1011      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1012      this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
1013      this.indexBlockEncoder =
1014        indexBlockEncoder != null ? indexBlockEncoder : NoOpIndexBlockEncoder.INSTANCE;
1015    }
1016
1017    public void setMaxChunkSize(int maxChunkSize) {
1018      if (maxChunkSize <= 0) {
1019        throw new IllegalArgumentException("Invalid maximum index block size");
1020      }
1021      this.maxChunkSize = maxChunkSize;
1022    }
1023
1024    public void setMinIndexNumEntries(int minIndexNumEntries) {
1025      if (minIndexNumEntries <= 1) {
1026        throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
1027      }
1028      this.minIndexNumEntries = minIndexNumEntries;
1029    }
1030
1031    /**
1032     * Writes the root level and intermediate levels of the block index into the output stream,
1033     * generating the tree from bottom up. Assumes that the leaf level has been inline-written to
1034     * the disk if there is enough data for more than one leaf block. We iterate by breaking the
1035     * current level of the block index, starting with the index of all leaf-level blocks, into
1036     * chunks small enough to be written to disk, and generate its parent level, until we end up
1037     * with a level small enough to become the root level. If the leaf level is not large enough,
1038     * there is no inline block index anymore, so we only write that level of block index to disk as
1039     * the root level.
1040     * @param out FSDataOutputStream
1041     * @return position at which we entered the root-level index.
1042     */
1043    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1044      if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1045        throw new IOException("Trying to write a multi-level block index, " + "but are "
1046          + curInlineChunk.getNumEntries() + " entries in the " + "last inline chunk.");
1047      }
1048
1049      // We need to get mid-key metadata before we create intermediate
1050      // indexes and overwrite the root chunk.
1051      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata() : null;
1052
1053      if (curInlineChunk != null) {
1054        while (
1055          rootChunk.getRootSize() > maxChunkSize
1056            // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
1057            && rootChunk.getNumEntries() > minIndexNumEntries
1058            // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
1059            && numLevels < 16
1060        ) {
1061          rootChunk = writeIntermediateLevel(out, rootChunk);
1062          numLevels += 1;
1063        }
1064      }
1065
1066      // write the root level
1067      long rootLevelIndexPos = out.getPos();
1068
1069      {
1070        DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX);
1071        indexBlockEncoder.encode(rootChunk, true, blockStream);
1072        if (midKeyMetadata != null) blockStream.write(midKeyMetadata);
1073        blockWriter.writeHeaderAndData(out);
1074        if (cacheConf != null) {
1075          cacheConf.getBlockCache().ifPresent(cache -> {
1076            HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1077            cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
1078              blockForCaching.getBlockType()), blockForCaching);
1079            // Index blocks always go to LRU, which then converts any off-heap buffer to on-heap,
1080            // so we need to release any off-heap buffers now to avoid leak
1081            blockForCaching.release();
1082          });
1083        }
1084      }
1085
1086      // Add root index block size
1087      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1088      totalBlockUncompressedSize += blockWriter.getUncompressedSizeWithoutHeader();
1089
1090      if (LOG.isTraceEnabled()) {
1091        LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1092          + rootLevelIndexPos + ", " + rootChunk.getNumEntries() + " root-level entries, "
1093          + totalNumEntries + " total entries, "
1094          + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) + " on-disk size, "
1095          + StringUtils.humanReadableInt(totalBlockUncompressedSize) + " total uncompressed size.");
1096      }
1097      return rootLevelIndexPos;
1098    }
1099
1100    /**
1101     * Writes the block index data as a single level only. Does not do any block framing.
1102     * @param out         the buffered output stream to write the index to. Typically a stream
1103     *                    writing into an {@link HFile} block.
1104     * @param description a short description of the index being written. Used in a log message.
1105     */
1106    public void writeSingleLevelIndex(DataOutput out, String description) throws IOException {
1107      expectNumLevels(1);
1108
1109      if (!singleLevelOnly) throw new IOException("Single-level mode is turned off");
1110
1111      if (rootChunk.getNumEntries() > 0)
1112        throw new IOException("Root-level entries already added in " + "single-level mode");
1113
1114      rootChunk = curInlineChunk;
1115      curInlineChunk = new BlockIndexChunkImpl();
1116
1117      if (LOG.isTraceEnabled()) {
1118        LOG.trace("Wrote a single-level " + description + " index with " + rootChunk.getNumEntries()
1119          + " entries, " + rootChunk.getRootSize() + " bytes");
1120      }
1121      indexBlockEncoder.encode(rootChunk, true, out);
1122    }
1123
1124    /**
1125     * Split the current level of the block index into intermediate index blocks of permitted size
1126     * and write those blocks to disk. Return the next level of the block index referencing those
1127     * intermediate-level blocks.
1128     * @param currentLevel the current level of the block index, such as the a chunk referencing all
1129     *                     leaf-level index blocks
1130     * @return the parent level block index, which becomes the root index after a few (usually zero)
1131     *         iterations
1132     */
1133    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1134      BlockIndexChunk currentLevel) throws IOException {
1135      // Entries referencing intermediate-level blocks we are about to create.
1136      BlockIndexChunk parent = new BlockIndexChunkImpl();
1137
1138      // The current intermediate-level block index chunk.
1139      BlockIndexChunk curChunk = new BlockIndexChunkImpl();
1140
1141      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1142        curChunk.add(currentLevel.getBlockKey(i), currentLevel.getBlockOffset(i),
1143          currentLevel.getOnDiskDataSize(i));
1144
1145        // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
1146        // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
1147        // first key is larger than maxChunkSize this will cause infinite recursion.
1148        if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
1149          writeIntermediateBlock(out, parent, curChunk);
1150        }
1151      }
1152
1153      if (curChunk.getNumEntries() > 0) {
1154        writeIntermediateBlock(out, parent, curChunk);
1155      }
1156
1157      return parent;
1158    }
1159
1160    private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk parent,
1161      BlockIndexChunk curChunk) throws IOException {
1162      long beginOffset = out.getPos();
1163      DataOutputStream dos = blockWriter.startWriting(BlockType.INTERMEDIATE_INDEX);
1164      indexBlockEncoder.encode(curChunk, false, dos);
1165      byte[] curFirstKey = curChunk.getBlockKey(0);
1166      blockWriter.writeHeaderAndData(out);
1167
1168      if (getCacheOnWrite()) {
1169        cacheConf.getBlockCache().ifPresent(cache -> {
1170          HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1171          cache.cacheBlock(
1172            new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
1173            blockForCaching);
1174        });
1175      }
1176
1177      // Add intermediate index block size
1178      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1179      totalBlockUncompressedSize += blockWriter.getUncompressedSizeWithoutHeader();
1180
1181      // OFFSET is the beginning offset the chunk of block index entries.
1182      // SIZE is the total byte size of the chunk of block index entries
1183      // + the secondary index size
1184      // FIRST_KEY is the first key in the chunk of block index
1185      // entries.
1186      parent.add(curFirstKey, beginOffset, blockWriter.getOnDiskSizeWithHeader());
1187
1188      // clear current block index chunk
1189      curChunk.clear();
1190      curFirstKey = null;
1191    }
1192
1193    /** Returns how many block index entries there are in the root level */
1194    public final int getNumRootEntries() {
1195      return rootChunk.getNumEntries();
1196    }
1197
1198    /** Returns the number of levels in this block index. */
1199    public int getNumLevels() {
1200      return numLevels;
1201    }
1202
1203    private void expectNumLevels(int expectedNumLevels) {
1204      if (numLevels != expectedNumLevels) {
1205        throw new IllegalStateException("Number of block index levels is " + numLevels
1206          + "but is expected to be " + expectedNumLevels);
1207      }
1208    }
1209
1210    /**
1211     * Whether there is an inline block ready to be written. In general, we write an leaf-level
1212     * index block as an inline block as soon as its size as serialized in the non-root format
1213     * reaches a certain threshold.
1214     */
1215    @Override
1216    public boolean shouldWriteBlock(boolean closing) {
1217      if (singleLevelOnly) {
1218        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1219      }
1220
1221      if (curInlineChunk == null) {
1222        throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been "
1223          + "called with closing=true and then called again?");
1224      }
1225
1226      if (curInlineChunk.getNumEntries() == 0) {
1227        return false;
1228      }
1229
1230      // We do have some entries in the current inline chunk.
1231      if (closing) {
1232        if (rootChunk.getNumEntries() == 0) {
1233          // We did not add any leaf-level blocks yet. Instead of creating a
1234          // leaf level with one block, move these entries to the root level.
1235
1236          expectNumLevels(1);
1237          rootChunk = curInlineChunk;
1238          curInlineChunk = null; // Disallow adding any more index entries.
1239          return false;
1240        }
1241
1242        return true;
1243      } else {
1244        return curInlineChunk.getNonRootSize() >= maxChunkSize;
1245      }
1246    }
1247
1248    /**
1249     * Write out the current inline index block. Inline blocks are non-root blocks, so the non-root
1250     * index format is used.
1251     */
1252    @Override
1253    public void writeInlineBlock(DataOutput out) throws IOException {
1254      if (singleLevelOnly) throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1255
1256      // Write the inline block index to the output stream in the non-root
1257      // index block format.
1258      indexBlockEncoder.encode(curInlineChunk, false, out);
1259
1260      // Save the first key of the inline block so that we can add it to the
1261      // parent-level index.
1262      firstKey = curInlineChunk.getBlockKey(0);
1263
1264      // Start a new inline index block
1265      curInlineChunk.clear();
1266    }
1267
1268    /**
1269     * Called after an inline block has been written so that we can add an entry referring to that
1270     * block to the parent-level index.
1271     */
1272    @Override
1273    public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1274      // Add leaf index block size
1275      totalBlockOnDiskSize += onDiskSize;
1276      totalBlockUncompressedSize += uncompressedSize;
1277
1278      if (singleLevelOnly) throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1279
1280      if (firstKey == null) {
1281        throw new IllegalStateException(
1282          "Trying to add second-level index " + "entry with offset=" + offset + " and onDiskSize="
1283            + onDiskSize + "but the first key was not set in writeInlineBlock");
1284      }
1285
1286      if (rootChunk.getNumEntries() == 0) {
1287        // We are writing the first leaf block, so increase index level.
1288        expectNumLevels(1);
1289        numLevels = 2;
1290      }
1291
1292      // Add another entry to the second-level index. Include the number of
1293      // entries in all previous leaf-level chunks for mid-key calculation.
1294      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1295      firstKey = null;
1296    }
1297
1298    @Override
1299    public BlockType getInlineBlockType() {
1300      return BlockType.LEAF_INDEX;
1301    }
1302
1303    /**
1304     * Add one index entry to the current leaf-level block. When the leaf-level block gets large
1305     * enough, it will be flushed to disk as an inline block.
1306     * @param firstKey      the first key of the data block
1307     * @param blockOffset   the offset of the data block
1308     * @param blockDataSize the on-disk size of the data block ({@link HFile} format version 2), or
1309     *                      the uncompressed size of the data block ( {@link HFile} format version
1310     *                      1).
1311     */
1312    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1313      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1314      ++totalNumEntries;
1315    }
1316
1317    /**
1318     * @throws IOException if we happened to write a multi-level index.
1319     */
1320    public void ensureSingleLevel() throws IOException {
1321      if (numLevels > 1) {
1322        throw new IOException(
1323          "Wrote a " + numLevels + "-level index with " + rootChunk.getNumEntries()
1324            + " root-level entries, but " + "this is expected to be a single-level block index.");
1325      }
1326    }
1327
1328    /**
1329     * @return true if we are using cache-on-write. This is configured by the caller of the
1330     *         constructor by either passing a valid block cache or null.
1331     */
1332    @Override
1333    public boolean getCacheOnWrite() {
1334      return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1335    }
1336
1337    /**
1338     * The total uncompressed size of the root index block, intermediate-level index blocks, and
1339     * leaf-level index blocks.
1340     * @return the total uncompressed size of all index blocks
1341     */
1342    public long getTotalUncompressedSize() {
1343      return totalBlockUncompressedSize;
1344    }
1345
1346  }
1347
1348  /**
1349   * A single chunk of the block index in the process of writing. The data in this chunk can become
1350   * a leaf-level, intermediate-level, or root index block.
1351   */
1352  static class BlockIndexChunkImpl implements BlockIndexChunk {
1353
1354    /** First keys of the key range corresponding to each index entry. */
1355    private final List<byte[]> blockKeys = new ArrayList<>();
1356
1357    /** Block offset in backing stream. */
1358    private final List<Long> blockOffsets = new ArrayList<>();
1359
1360    /** On-disk data sizes of lower-level data or index blocks. */
1361    private final List<Integer> onDiskDataSizes = new ArrayList<>();
1362
1363    /**
1364     * The cumulative number of sub-entries, i.e. entries on deeper-level block index entries.
1365     * numSubEntriesAt[i] is the number of sub-entries in the blocks corresponding to this chunk's
1366     * entries #0 through #i inclusively.
1367     */
1368    private final List<Long> numSubEntriesAt = new ArrayList<>();
1369
1370    /**
1371     * The offset of the next entry to be added, relative to the end of the "secondary index" in the
1372     * "non-root" format representation of this index chunk. This is the next value to be added to
1373     * the secondary index.
1374     */
1375    private int curTotalNonRootEntrySize = 0;
1376
1377    /**
1378     * The accumulated size of this chunk if stored in the root index format.
1379     */
1380    private int curTotalRootSize = 0;
1381
1382    /**
1383     * The "secondary index" used for binary search over variable-length records in a "non-root"
1384     * format block. These offsets are relative to the end of this secondary index.
1385     */
1386    private final List<Integer> secondaryIndexOffsetMarks = new ArrayList<>();
1387
1388    /**
1389     * Adds a new entry to this block index chunk.
1390     * @param firstKey              the first key in the block pointed to by this entry
1391     * @param blockOffset           the offset of the next-level block pointed to by this entry
1392     * @param onDiskDataSize        the on-disk data of the block pointed to by this entry,
1393     *                              including header size
1394     * @param curTotalNumSubEntries if this chunk is the root index chunk under construction, this
1395     *                              specifies the current total number of sub-entries in all
1396     *                              leaf-level chunks, including the one corresponding to the
1397     *                              second-level entry being added.
1398     */
1399    @Override
1400    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1401      long curTotalNumSubEntries) {
1402      // Record the offset for the secondary index
1403      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1404      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD + firstKey.length;
1405
1406      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1407        + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1408
1409      blockKeys.add(firstKey);
1410      blockOffsets.add(blockOffset);
1411      onDiskDataSizes.add(onDiskDataSize);
1412
1413      if (curTotalNumSubEntries != -1) {
1414        numSubEntriesAt.add(curTotalNumSubEntries);
1415
1416        // Make sure the parallel arrays are in sync.
1417        if (numSubEntriesAt.size() != blockKeys.size()) {
1418          throw new IllegalStateException("Only have key/value count " + "stats for "
1419            + numSubEntriesAt.size() + " block index " + "entries out of " + blockKeys.size());
1420        }
1421      }
1422    }
1423
1424    /**
1425     * The same as {@link #add(byte[], long, int, long)} but does not take the key/value into
1426     * account. Used for single-level indexes.
1427     * @see #add(byte[], long, int, long)
1428     */
1429    @Override
1430    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1431      add(firstKey, blockOffset, onDiskDataSize, -1);
1432    }
1433
1434    @Override
1435    public void clear() {
1436      blockKeys.clear();
1437      blockOffsets.clear();
1438      onDiskDataSizes.clear();
1439      secondaryIndexOffsetMarks.clear();
1440      numSubEntriesAt.clear();
1441      curTotalNonRootEntrySize = 0;
1442      curTotalRootSize = 0;
1443    }
1444
1445    /**
1446     * Finds the entry corresponding to the deeper-level index block containing the given
1447     * deeper-level entry (a "sub-entry"), assuming a global 0-based ordering of sub-entries.
1448     * <p>
1449     * <i> Implementation note. </i> We are looking for i such that numSubEntriesAt[i - 1] <= k <
1450     * numSubEntriesAt[i], because a deeper-level block #i (0-based) contains sub-entries #
1451     * numSubEntriesAt[i - 1]'th through numSubEntriesAt[i] - 1, assuming a global 0-based ordering
1452     * of sub-entries. i is by definition the insertion point of k in numSubEntriesAt.
1453     * @param k sub-entry index, from 0 to the total number sub-entries - 1
1454     * @return the 0-based index of the entry corresponding to the given sub-entry
1455     */
1456    @Override
1457    public int getEntryBySubEntry(long k) {
1458      // We define mid-key as the key corresponding to k'th sub-entry
1459      // (0-based).
1460
1461      int i = Collections.binarySearch(numSubEntriesAt, k);
1462
1463      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1464      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1465      // is in the (i + 1)'th chunk.
1466      if (i >= 0) return i + 1;
1467
1468      // Inexact match. Return the insertion point.
1469      return -i - 1;
1470    }
1471
1472    /**
1473     * Used when writing the root block index of a multi-level block index. Serializes additional
1474     * information allowing to efficiently identify the mid-key.
1475     * @return a few serialized fields for finding the mid-key
1476     * @throws IOException if could not create metadata for computing mid-key
1477     */
1478    @Override
1479    public byte[] getMidKeyMetadata() throws IOException {
1480      ByteArrayOutputStream baos = new ByteArrayOutputStream(MID_KEY_METADATA_SIZE);
1481      DataOutputStream baosDos = new DataOutputStream(baos);
1482      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1483      if (totalNumSubEntries == 0) {
1484        throw new IOException("No leaf-level entries, mid-key unavailable");
1485      }
1486      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1487      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1488
1489      baosDos.writeLong(blockOffsets.get(midKeyEntry));
1490      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1491
1492      long numSubEntriesBefore = midKeyEntry > 0 ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1493      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1494      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE) {
1495        throw new IOException("Could not identify mid-key index within the "
1496          + "leaf-level block containing mid-key: out of range (" + subEntryWithinEntry
1497          + ", numSubEntriesBefore=" + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1498          + ")");
1499      }
1500
1501      baosDos.writeInt((int) subEntryWithinEntry);
1502
1503      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1504        throw new IOException("Could not write mid-key metadata: size=" + baosDos.size()
1505          + ", correct size: " + MID_KEY_METADATA_SIZE);
1506      }
1507
1508      // Close just to be good citizens, although this has no effect.
1509      baos.close();
1510
1511      return baos.toByteArray();
1512    }
1513
1514    /** Returns the size of this chunk if stored in the non-root index block format */
1515    @Override
1516    public int getNonRootSize() {
1517      return Bytes.SIZEOF_INT // Number of entries
1518        + Bytes.SIZEOF_INT * (blockKeys.size() + 1) // Secondary index
1519        + curTotalNonRootEntrySize; // All entries
1520    }
1521
1522    @Override
1523    public int getCurTotalNonRootEntrySize() {
1524      return curTotalNonRootEntrySize;
1525    }
1526
1527    @Override
1528    public List<byte[]> getBlockKeys() {
1529      return blockKeys;
1530    }
1531
1532    @Override
1533    public List<Integer> getSecondaryIndexOffsetMarks() {
1534      return secondaryIndexOffsetMarks;
1535    }
1536
1537    /** Returns the size of this chunk if stored in the root index block format */
1538    @Override
1539    public int getRootSize() {
1540      return curTotalRootSize;
1541    }
1542
1543    /** Returns the number of entries in this block index chunk */
1544    public int getNumEntries() {
1545      return blockKeys.size();
1546    }
1547
1548    public byte[] getBlockKey(int i) {
1549      return blockKeys.get(i);
1550    }
1551
1552    public long getBlockOffset(int i) {
1553      return blockOffsets.get(i);
1554    }
1555
1556    public int getOnDiskDataSize(int i) {
1557      return onDiskDataSizes.get(i);
1558    }
1559
1560    public long getCumulativeNumKV(int i) {
1561      if (i < 0) return 0;
1562      return numSubEntriesAt.get(i);
1563    }
1564
1565  }
1566
1567  public static int getMaxChunkSize(Configuration conf) {
1568    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1569  }
1570
1571  public static int getMinIndexNumEntries(Configuration conf) {
1572    return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1573  }
1574}