001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.util.Arrays;
021import java.util.Comparator;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Queue;
026import java.util.Set;
027import java.util.concurrent.atomic.LongAdder;
028import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
029import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
035import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
036import org.apache.hbase.thirdparty.com.google.common.collect.MinMaxPriorityQueue;
037import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
038import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.LinkedMap;
039
040/**
041 * This class is used to allocate a block with specified size and free the block when evicting. It
042 * manages an array of buckets, each bucket is associated with a size and caches elements up to this
043 * size. For a completely empty bucket, this size could be re-specified dynamically.
044 * <p/>
045 * This class is not thread safe.
046 */
047@InterfaceAudience.Private
048public final class BucketAllocator {
049  private static final Logger LOG = LoggerFactory.getLogger(BucketAllocator.class);
050
051  public final static class Bucket {
052    private long baseOffset;
053    private int itemAllocationSize, sizeIndex;
054    private int itemCount;
055    private int freeList[];
056    private int freeCount, usedCount;
057
058    public Bucket(long offset) {
059      baseOffset = offset;
060      sizeIndex = -1;
061    }
062
063    void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
064      Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
065      this.sizeIndex = sizeIndex;
066      itemAllocationSize = bucketSizes[sizeIndex];
067      itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
068      freeCount = itemCount;
069      usedCount = 0;
070      freeList = new int[itemCount];
071      for (int i = 0; i < freeCount; ++i)
072        freeList[i] = i;
073    }
074
075    public boolean isUninstantiated() {
076      return sizeIndex == -1;
077    }
078
079    public int sizeIndex() {
080      return sizeIndex;
081    }
082
083    public int getItemAllocationSize() {
084      return itemAllocationSize;
085    }
086
087    public boolean hasFreeSpace() {
088      return freeCount > 0;
089    }
090
091    public boolean isCompletelyFree() {
092      return usedCount == 0;
093    }
094
095    public int freeCount() {
096      return freeCount;
097    }
098
099    public int usedCount() {
100      return usedCount;
101    }
102
103    public int getFreeBytes() {
104      return freeCount * itemAllocationSize;
105    }
106
107    public int getUsedBytes() {
108      return usedCount * itemAllocationSize;
109    }
110
111    public long getBaseOffset() {
112      return baseOffset;
113    }
114
115    /**
116     * Allocate a block in this bucket, return the offset representing the position in physical
117     * space
118     * @return the offset in the IOEngine
119     */
120    public long allocate() {
121      assert freeCount > 0; // Else should not have been called
122      assert sizeIndex != -1;
123      ++usedCount;
124      long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
125      assert offset >= 0;
126      return offset;
127    }
128
129    public boolean addAllocation(long offset) throws BucketAllocatorException {
130      offset -= baseOffset;
131      if (offset < 0 || offset % itemAllocationSize != 0)
132        throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset
133          + " base=" + baseOffset + ", bucket size=" + itemAllocationSize);
134      int idx = (int) (offset / itemAllocationSize);
135      boolean matchFound = false;
136      for (int i = 0; i < freeCount; ++i) {
137        if (matchFound) freeList[i - 1] = freeList[i];
138        else if (freeList[i] == idx) matchFound = true;
139      }
140      if (!matchFound) {
141        LOG.warn("We found more entries for bucket starting at offset {} for blocks of {} size. "
142          + "Skipping entry at cache offset {}", baseOffset, itemAllocationSize, offset);
143        return false;
144      }
145      ++usedCount;
146      --freeCount;
147      return true;
148    }
149
150    private void free(long offset) {
151      offset -= baseOffset;
152      assert offset >= 0;
153      assert offset < itemCount * itemAllocationSize;
154      assert offset % itemAllocationSize == 0;
155      assert usedCount > 0;
156      assert freeCount < itemCount; // Else duplicate free
157      int item = (int) (offset / (long) itemAllocationSize);
158      assert !freeListContains(item);
159      --usedCount;
160      freeList[freeCount++] = item;
161    }
162
163    private boolean freeListContains(int blockNo) {
164      for (int i = 0; i < freeCount; ++i) {
165        if (freeList[i] == blockNo) return true;
166      }
167      return false;
168    }
169  }
170
171  final class BucketSizeInfo {
172    // Free bucket means it has space to allocate a block;
173    // Completely free bucket means it has no block.
174    private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
175    // only modified under synchronization, but also read outside it.
176    private volatile long fragmentationBytes;
177    private int sizeIndex;
178
179    BucketSizeInfo(int sizeIndex) {
180      bucketList = new LinkedMap();
181      freeBuckets = new LinkedMap();
182      completelyFreeBuckets = new LinkedMap();
183      fragmentationBytes = 0;
184      this.sizeIndex = sizeIndex;
185    }
186
187    public synchronized void instantiateBucket(Bucket b) {
188      assert b.isUninstantiated() || b.isCompletelyFree();
189      b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
190      bucketList.put(b, b);
191      freeBuckets.put(b, b);
192      completelyFreeBuckets.put(b, b);
193    }
194
195    public int sizeIndex() {
196      return sizeIndex;
197    }
198
199    /**
200     * Find a bucket to allocate a block
201     * @return the offset in the IOEngine
202     */
203    public long allocateBlock(int blockSize) {
204      Bucket b = null;
205      if (freeBuckets.size() > 0) {
206        // Use up an existing one first...
207        b = (Bucket) freeBuckets.lastKey();
208      }
209      if (b == null) {
210        b = grabGlobalCompletelyFreeBucket();
211        if (b != null) instantiateBucket(b);
212      }
213      if (b == null) return -1;
214      long result = b.allocate();
215      blockAllocated(b);
216      if (blockSize < b.getItemAllocationSize()) {
217        fragmentationBytes += b.getItemAllocationSize() - blockSize;
218      }
219      return result;
220    }
221
222    void blockAllocated(Bucket b) {
223      if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
224      if (!b.hasFreeSpace()) freeBuckets.remove(b);
225    }
226
227    public Bucket findAndRemoveCompletelyFreeBucket() {
228      Bucket b = null;
229      assert bucketList.size() > 0;
230      if (bucketList.size() == 1) {
231        // So we never get complete starvation of a bucket for a size
232        return null;
233      }
234
235      if (completelyFreeBuckets.size() > 0) {
236        b = (Bucket) completelyFreeBuckets.firstKey();
237        removeBucket(b);
238      }
239      return b;
240    }
241
242    private synchronized void removeBucket(Bucket b) {
243      assert b.isCompletelyFree();
244      bucketList.remove(b);
245      freeBuckets.remove(b);
246      completelyFreeBuckets.remove(b);
247    }
248
249    public void freeBlock(Bucket b, long offset, int length) {
250      assert bucketList.containsKey(b);
251      // else we shouldn't have anything to free...
252      assert (!completelyFreeBuckets.containsKey(b));
253      b.free(offset);
254      if (length < b.getItemAllocationSize()) {
255        fragmentationBytes -= b.getItemAllocationSize() - length;
256      }
257      if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
258      if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
259    }
260
261    public synchronized IndexStatistics statistics() {
262      long free = 0, used = 0;
263      int full = 0;
264      for (Object obj : bucketList.keySet()) {
265        Bucket b = (Bucket) obj;
266        free += b.freeCount();
267        used += b.usedCount();
268        if (!b.hasFreeSpace()) {
269          full++;
270        }
271      }
272      int bucketObjectSize = bucketSizes[sizeIndex];
273      // this is most likely to always be 1 or 0
274      int fillingBuckets = Math.max(0, freeBuckets.size() - completelyFreeBuckets.size());
275      // if bucket capacity is not perfectly divisible by a bucket's object size, there will
276      // be some left over per bucket. for some object sizes this may be large enough to be
277      // non-trivial and worth tuning by choosing a more divisible object size.
278      long wastedBytes = (bucketCapacity % bucketObjectSize) * (full + fillingBuckets);
279      return new IndexStatistics(free, used, bucketObjectSize, full, completelyFreeBuckets.size(),
280        wastedBytes, fragmentationBytes);
281    }
282
283    @Override
284    public String toString() {
285      return MoreObjects.toStringHelper(this.getClass()).add("sizeIndex", sizeIndex)
286        .add("bucketSize", bucketSizes[sizeIndex]).toString();
287    }
288  }
289
290  // Default block size in hbase is 64K, so we choose more sizes near 64K, you'd better
291  // reset it according to your cluster's block size distribution
292  // The real block size in hfile maybe a little larger than the size we configured ,
293  // so we need add extra 1024 bytes for fit.
294  // TODO Support the view of block size distribution statistics
295  private static final int DEFAULT_BUCKET_SIZES[] =
296    { 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024,
297      48 * 1024 + 1024, 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
298      192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024, 512 * 1024 + 1024 };
299
300  /**
301   * Round up the given block size to bucket size, and get the corresponding BucketSizeInfo
302   */
303  public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
304    for (int i = 0; i < bucketSizes.length; ++i)
305      if (blockSize <= bucketSizes[i]) return bucketSizeInfos[i];
306    return null;
307  }
308
309  /**
310   * So, what is the minimum amount of items we'll tolerate in a single bucket?
311   */
312  static public final int FEWEST_ITEMS_IN_BUCKET = 4;
313
314  private final int[] bucketSizes;
315  private final int bigItemSize;
316  // The capacity size for each bucket
317  private final long bucketCapacity;
318  private Bucket[] buckets;
319  private BucketSizeInfo[] bucketSizeInfos;
320  private final long totalSize;
321  private transient long usedSize = 0;
322
323  BucketAllocator(long availableSpace, int[] bucketSizes) throws BucketAllocatorException {
324    this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
325    Arrays.sort(this.bucketSizes);
326    this.bigItemSize = Ints.max(this.bucketSizes);
327    this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * (long) bigItemSize;
328    buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
329    if (buckets.length < this.bucketSizes.length)
330      throw new BucketAllocatorException("Bucket allocator size too small (" + buckets.length
331        + "); must have room for at least " + this.bucketSizes.length + " buckets");
332    bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
333    for (int i = 0; i < this.bucketSizes.length; ++i) {
334      bucketSizeInfos[i] = new BucketSizeInfo(i);
335    }
336    for (int i = 0; i < buckets.length; ++i) {
337      buckets[i] = new Bucket(bucketCapacity * i);
338      bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
339        .instantiateBucket(buckets[i]);
340    }
341    this.totalSize = ((long) buckets.length) * bucketCapacity;
342    if (LOG.isInfoEnabled()) {
343      LOG.info("Cache totalSize=" + this.totalSize + ", buckets=" + this.buckets.length
344        + ", bucket capacity=" + this.bucketCapacity + "=(" + FEWEST_ITEMS_IN_BUCKET + "*"
345        + this.bigItemSize + ")="
346        + "(FEWEST_ITEMS_IN_BUCKET*(largest configured bucketcache size))");
347    }
348  }
349
350  /**
351   * Rebuild the allocator's data structures from a persisted map.
352   * @param availableSpace capacity of cache
353   * @param map            A map stores the block key and BucketEntry(block's meta data like offset,
354   *                       length)
355   * @param realCacheSize  cached data size statistics for bucket cache
356   */
357  BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
358    LongAdder realCacheSize) throws BucketAllocatorException {
359    this(availableSpace, bucketSizes);
360
361    // each bucket has an offset, sizeindex. probably the buckets are too big
362    // in our default state. so what we do is reconfigure them according to what
363    // we've found. we can only reconfigure each bucket once; if more than once,
364    // we know there's a bug, so we just log the info, throw, and start again...
365    boolean[] reconfigured = new boolean[buckets.length];
366    int sizeNotMatchedCount = 0;
367    int insufficientCapacityCount = 0;
368    Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator();
369    while (iterator.hasNext()) {
370      Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
371      long foundOffset = entry.getValue().offset();
372      int foundLen = entry.getValue().getLength();
373      int bucketSizeIndex = -1;
374      for (int i = 0; i < this.bucketSizes.length; ++i) {
375        if (foundLen <= this.bucketSizes[i]) {
376          bucketSizeIndex = i;
377          break;
378        }
379      }
380      if (bucketSizeIndex == -1) {
381        sizeNotMatchedCount++;
382        iterator.remove();
383        continue;
384      }
385      int bucketNo = (int) (foundOffset / bucketCapacity);
386      if (bucketNo < 0 || bucketNo >= buckets.length) {
387        insufficientCapacityCount++;
388        iterator.remove();
389        continue;
390      }
391      Bucket b = buckets[bucketNo];
392      if (reconfigured[bucketNo]) {
393        if (b.sizeIndex() != bucketSizeIndex) {
394          throw new BucketAllocatorException("Inconsistent allocation in bucket map;");
395        }
396      } else {
397        if (!b.isCompletelyFree()) {
398          throw new BucketAllocatorException(
399            "Reconfiguring bucket " + bucketNo + " but it's already allocated; corrupt data");
400        }
401        // Need to remove the bucket from whichever list it's currently in at
402        // the moment...
403        BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
404        BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
405        oldbsi.removeBucket(b);
406        bsi.instantiateBucket(b);
407        reconfigured[bucketNo] = true;
408      }
409      if (buckets[bucketNo].addAllocation(foundOffset)) {
410        realCacheSize.add(foundLen);
411        usedSize += buckets[bucketNo].getItemAllocationSize();
412        bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
413      }
414    }
415
416    if (sizeNotMatchedCount > 0) {
417      LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because "
418        + "there is no matching bucket size for these blocks");
419    }
420    if (insufficientCapacityCount > 0) {
421      LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - "
422        + "did you shrink the cache?");
423    }
424  }
425
426  @Override
427  public String toString() {
428    StringBuilder sb = new StringBuilder(1024);
429    for (int i = 0; i < buckets.length; ++i) {
430      Bucket b = buckets[i];
431      if (i > 0) sb.append(", ");
432      sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
433      sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
434    }
435    return sb.toString();
436  }
437
438  public long getUsedSize() {
439    return this.usedSize;
440  }
441
442  public long getFreeSize() {
443    return this.totalSize - getUsedSize();
444  }
445
446  public long getTotalSize() {
447    return this.totalSize;
448  }
449
450  /**
451   * Allocate a block with specified size. Return the offset
452   * @param blockSize size of block
453   * @return the offset in the IOEngine
454   */
455  public synchronized long allocateBlock(int blockSize)
456    throws CacheFullException, BucketAllocatorException {
457    assert blockSize > 0;
458    BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
459    if (bsi == null) {
460      throw new BucketAllocatorException("Allocation too big size=" + blockSize
461        + "; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY
462        + " to accomodate if size seems reasonable and you want it cached.");
463    }
464    long offset = bsi.allocateBlock(blockSize);
465
466    // Ask caller to free up space and try again!
467    if (offset < 0) throw new CacheFullException(blockSize, bsi.sizeIndex());
468    usedSize += bucketSizes[bsi.sizeIndex()];
469    return offset;
470  }
471
472  private Bucket grabGlobalCompletelyFreeBucket() {
473    for (BucketSizeInfo bsi : bucketSizeInfos) {
474      Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
475      if (b != null) return b;
476    }
477    return null;
478  }
479
480  /**
481   * Free a block with the offset
482   * @param offset block's offset
483   * @return size freed
484   */
485  public synchronized int freeBlock(long offset, int length) {
486    int bucketNo = (int) (offset / bucketCapacity);
487    assert bucketNo >= 0 && bucketNo < buckets.length;
488    Bucket targetBucket = buckets[bucketNo];
489    bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset, length);
490    usedSize -= targetBucket.getItemAllocationSize();
491    return targetBucket.getItemAllocationSize();
492  }
493
494  public int sizeIndexOfAllocation(long offset) {
495    int bucketNo = (int) (offset / bucketCapacity);
496    assert bucketNo >= 0 && bucketNo < buckets.length;
497    Bucket targetBucket = buckets[bucketNo];
498    return targetBucket.sizeIndex();
499  }
500
501  public int sizeOfAllocation(long offset) {
502    int bucketNo = (int) (offset / bucketCapacity);
503    assert bucketNo >= 0 && bucketNo < buckets.length;
504    Bucket targetBucket = buckets[bucketNo];
505    return targetBucket.getItemAllocationSize();
506  }
507
508  /**
509   * Statistics to give a glimpse into the distribution of BucketCache objects. Each configured
510   * bucket size, denoted by {@link BucketSizeInfo}, gets an IndexStatistic. A BucketSizeInfo
511   * allocates blocks of a configured size from claimed buckets. If you have a bucket size of 512k,
512   * the corresponding BucketSizeInfo will always allocate chunks of 512k at a time regardless of
513   * actual request.
514   * <p>
515   * Over time, as a BucketSizeInfo gets more allocations, it will claim more buckets from the total
516   * pool of completelyFreeBuckets. As blocks are freed from a BucketSizeInfo, those buckets may be
517   * returned to the completelyFreeBuckets pool.
518   * <p>
519   * The IndexStatistics help visualize how these buckets are currently distributed, through counts
520   * of items, bytes, and fullBuckets. Additionally, mismatches between block sizes and bucket sizes
521   * can manifest in inefficient cache usage. These typically manifest in three ways:
522   * <p>
523   * 1. Allocation failures, because block size is larger than max bucket size. These show up in
524   * logs and can be alleviated by adding larger bucket sizes if appropriate.<br>
525   * 2. Memory fragmentation, because blocks are typically smaller than the bucket size. See
526   * {@link #fragmentationBytes()} for details.<br>
527   * 3. Memory waste, because a bucket's itemSize is not a perfect divisor of bucketCapacity. see
528   * {@link #wastedBytes()} for details.<br>
529   */
530  static class IndexStatistics {
531    private long freeCount, usedCount, itemSize, totalCount, wastedBytes, fragmentationBytes;
532    private int fullBuckets, completelyFreeBuckets;
533
534    /**
535     * How many more items can be allocated from the currently claimed blocks of this bucket size
536     */
537    public long freeCount() {
538      return freeCount;
539    }
540
541    /**
542     * How many items are currently taking up space in this bucket size's buckets
543     */
544    public long usedCount() {
545      return usedCount;
546    }
547
548    /**
549     * Combined {@link #freeCount()} + {@link #usedCount()}
550     */
551    public long totalCount() {
552      return totalCount;
553    }
554
555    /**
556     * How many more bytes can be allocated from the currently claimed blocks of this bucket size
557     */
558    public long freeBytes() {
559      return freeCount * itemSize;
560    }
561
562    /**
563     * How many bytes are currently taking up space in this bucket size's buckets Note: If your
564     * items are less than the bucket size of this bucket, the actual used bytes by items will be
565     * lower than this value. But since a bucket size can only allocate items of a single size, this
566     * value is the true number of used bytes. The difference will be counted in
567     * {@link #fragmentationBytes()}.
568     */
569    public long usedBytes() {
570      return usedCount * itemSize;
571    }
572
573    /**
574     * Combined {@link #totalCount()} * {@link #itemSize()}
575     */
576    public long totalBytes() {
577      return totalCount * itemSize;
578    }
579
580    /**
581     * This bucket size can only allocate items of this size, even if the requested allocation size
582     * is smaller. The rest goes towards {@link #fragmentationBytes()}.
583     */
584    public long itemSize() {
585      return itemSize;
586    }
587
588    /**
589     * How many buckets have been completely filled by blocks for this bucket size. These buckets
590     * can't accept any more blocks unless some existing are freed.
591     */
592    public int fullBuckets() {
593      return fullBuckets;
594    }
595
596    /**
597     * How many buckets are currently claimed by this bucket size but as yet totally unused. These
598     * buckets are available for reallocation to other bucket sizes if those fill up.
599     */
600    public int completelyFreeBuckets() {
601      return completelyFreeBuckets;
602    }
603
604    /**
605     * If {@link #bucketCapacity} is not perfectly divisible by this {@link #itemSize()}, the
606     * remainder will be unusable by in buckets of this size. A high value here may be optimized by
607     * trying to choose bucket sizes which can better divide {@link #bucketCapacity}.
608     */
609    public long wastedBytes() {
610      return wastedBytes;
611    }
612
613    /**
614     * Every time you allocate blocks in these buckets where the block size is less than the bucket
615     * size, fragmentation increases by that difference. You can reduce fragmentation by lowering
616     * the bucket size so that it is closer to the typical block size. This may have the consequence
617     * of bumping some blocks to the next larger bucket size, so experimentation may be needed.
618     */
619    public long fragmentationBytes() {
620      return fragmentationBytes;
621    }
622
623    public IndexStatistics(long free, long used, long itemSize, int fullBuckets,
624      int completelyFreeBuckets, long wastedBytes, long fragmentationBytes) {
625      setTo(free, used, itemSize, fullBuckets, completelyFreeBuckets, wastedBytes,
626        fragmentationBytes);
627    }
628
629    public IndexStatistics() {
630      setTo(-1, -1, 0, 0, 0, 0, 0);
631    }
632
633    public void setTo(long free, long used, long itemSize, int fullBuckets,
634      int completelyFreeBuckets, long wastedBytes, long fragmentationBytes) {
635      this.itemSize = itemSize;
636      this.freeCount = free;
637      this.usedCount = used;
638      this.totalCount = free + used;
639      this.fullBuckets = fullBuckets;
640      this.completelyFreeBuckets = completelyFreeBuckets;
641      this.wastedBytes = wastedBytes;
642      this.fragmentationBytes = fragmentationBytes;
643    }
644  }
645
646  public Bucket[] getBuckets() {
647    return this.buckets;
648  }
649
650  void logDebugStatistics() {
651    if (!LOG.isDebugEnabled()) {
652      return;
653    }
654
655    IndexStatistics total = new IndexStatistics();
656    IndexStatistics[] stats = getIndexStatistics(total);
657    LOG.debug("Bucket allocator statistics follow:");
658    LOG.debug(
659      "  Free bytes={}; used bytes={}; total bytes={}; wasted bytes={}; fragmentation bytes={}; "
660        + "completelyFreeBuckets={}",
661      total.freeBytes(), total.usedBytes(), total.totalBytes(), total.wastedBytes(),
662      total.fragmentationBytes(), total.completelyFreeBuckets());
663    for (IndexStatistics s : stats) {
664      LOG.debug(
665        "  Object size {}; used={}; free={}; total={}; wasted bytes={}; fragmentation bytes={}, "
666          + "full buckets={}",
667        s.itemSize(), s.usedCount(), s.freeCount(), s.totalCount(), s.wastedBytes(),
668        s.fragmentationBytes(), s.fullBuckets());
669    }
670  }
671
672  IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
673    IndexStatistics[] stats = getIndexStatistics();
674    long totalfree = 0, totalused = 0, totalWasted = 0, totalFragmented = 0;
675    int fullBuckets = 0, completelyFreeBuckets = 0;
676
677    for (IndexStatistics stat : stats) {
678      totalfree += stat.freeBytes();
679      totalused += stat.usedBytes();
680      totalWasted += stat.wastedBytes();
681      totalFragmented += stat.fragmentationBytes();
682      fullBuckets += stat.fullBuckets();
683      completelyFreeBuckets += stat.completelyFreeBuckets();
684    }
685    grandTotal.setTo(totalfree, totalused, 1, fullBuckets, completelyFreeBuckets, totalWasted,
686      totalFragmented);
687    return stats;
688  }
689
690  IndexStatistics[] getIndexStatistics() {
691    IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
692    for (int i = 0; i < stats.length; ++i)
693      stats[i] = bucketSizeInfos[i].statistics();
694    return stats;
695  }
696
697  public int getBucketIndex(long offset) {
698    return (int) (offset / bucketCapacity);
699  }
700
701  /**
702   * Returns a set of indices of the buckets that are least filled excluding the offsets, we also
703   * the fully free buckets for the BucketSizes where everything is empty and they only have one
704   * completely free bucket as a reserved
705   * @param excludedBuckets the buckets that need to be excluded due to currently being in used
706   * @param bucketCount     max Number of buckets to return
707   * @return set of bucket indices which could be used for eviction
708   */
709  public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets, int bucketCount) {
710    Queue<Integer> queue = MinMaxPriorityQueue.<Integer> orderedBy(new Comparator<Integer>() {
711      @Override
712      public int compare(Integer left, Integer right) {
713        // We will always get instantiated buckets
714        return Float.compare(((float) buckets[left].usedCount) / buckets[left].itemCount,
715          ((float) buckets[right].usedCount) / buckets[right].itemCount);
716      }
717    }).maximumSize(bucketCount).create();
718
719    for (int i = 0; i < buckets.length; i++) {
720      if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
721      // Avoid the buckets that are the only buckets for a sizeIndex
722        bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1
723      ) {
724        queue.add(i);
725      }
726    }
727
728    Set<Integer> result = new HashSet<>(bucketCount);
729    result.addAll(queue);
730
731    return result;
732  }
733}