001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.nio.ByteBuffer;
022import java.util.Set;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ConcurrentSkipListSet;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicReference;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ByteBufferExtendedCell;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.nio.RefCnt;
034import org.apache.hadoop.hbase.regionserver.CompactingMemStore.IndexType;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
040
041/**
042 * A memstore-local allocation buffer.
043 * <p>
044 * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) byte[] chunks
045 * from and then doles it out to threads that request slices into the array.
046 * <p>
047 * The purpose of this class is to combat heap fragmentation in the regionserver. By ensuring that
048 * all Cells in a given memstore refer only to large chunks of contiguous memory, we ensure that
049 * large blocks get freed up when the memstore is flushed.
050 * <p>
051 * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the
052 * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting
053 * collection occurs.
054 * <p>
055 * TODO: we should probably benchmark whether word-aligning the allocations would provide a
056 * performance improvement - probably would speed up the Bytes.toLong/Bytes.toInt calls in KeyValue,
057 * but some of those are cached anyway. The chunks created by this MemStoreLAB can get pooled at
058 * {@link ChunkCreator}. When the Chunk comes from pool, it can be either an on heap or an off heap
059 * backed chunk. The chunks, which this MemStoreLAB creates on its own (when no chunk available from
060 * pool), those will be always on heap backed.
061 * <p>
062 * NOTE:if user requested to work with MSLABs (whether on- or off-heap), in
063 * {@link CompactingMemStore} ctor, the {@link CompactingMemStore#indexType} could only be
064 * {@link IndexType#CHUNK_MAP},that is to say the immutable segments using MSLABs are going to use
065 * {@link CellChunkMap} as their index.
066 */
067@InterfaceAudience.Private
068public class MemStoreLABImpl implements MemStoreLAB {
069
070  static final Logger LOG = LoggerFactory.getLogger(MemStoreLABImpl.class);
071
072  private AtomicReference<Chunk> currChunk = new AtomicReference<>();
073  // Lock to manage multiple handlers requesting for a chunk
074  private ReentrantLock lock = new ReentrantLock();
075
076  // A set of chunks contained by this memstore LAB
077  Set<Integer> chunks = new ConcurrentSkipListSet<Integer>();
078  private final int dataChunkSize;
079  private final int maxAlloc;
080  private final ChunkCreator chunkCreator;
081
082  // This flag is for closing this instance, its set when clearing snapshot of
083  // memstore
084  private final AtomicBoolean closed = new AtomicBoolean(false);;
085  // This flag is for reclaiming chunks. Its set when putting chunks back to
086  // pool
087  private final AtomicBoolean reclaimed = new AtomicBoolean(false);
088  /**
089   * Its initial value is 1, so it is one bigger than the current count of open scanners which
090   * reading data from this MemStoreLAB.
091   */
092  private final RefCnt refCnt;
093
094  // Used in testing
095  public MemStoreLABImpl() {
096    this(new Configuration());
097  }
098
099  public MemStoreLABImpl(Configuration conf) {
100    dataChunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
101    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
102    this.chunkCreator = ChunkCreator.getInstance();
103    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
104    Preconditions.checkArgument(maxAlloc <= dataChunkSize,
105      MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
106
107    this.refCnt = RefCnt.create(() -> {
108      recycleChunks();
109    });
110
111  }
112
113  @Override
114  public ExtendedCell copyCellInto(ExtendedCell cell) {
115    // See head of copyBBECellInto for how it differs from copyCellInto
116    return (cell instanceof ByteBufferExtendedCell)
117      ? copyBBECellInto((ByteBufferExtendedCell) cell, maxAlloc)
118      : copyCellInto(cell, maxAlloc);
119  }
120
121  /**
122   * When a cell's size is too big (bigger than maxAlloc), copyCellInto does not allocate it on
123   * MSLAB. Since the process of flattening to CellChunkMap assumes that all cells are allocated on
124   * MSLAB, during this process, the big cells are copied into MSLAB using this method.
125   */
126  @Override
127  public ExtendedCell forceCopyOfBigCellInto(ExtendedCell cell) {
128    int size = Segment.getCellLength(cell);
129    Preconditions.checkArgument(size >= 0, "negative size");
130    if (size + ChunkCreator.SIZEOF_CHUNK_HEADER <= dataChunkSize) {
131      // Using copyCellInto for cells which are bigger than the original maxAlloc
132      return copyCellInto(cell, dataChunkSize);
133    } else {
134      Chunk c = getNewExternalChunk(size);
135      int allocOffset = c.alloc(size);
136      return copyToChunkCell(cell, c.getData(), allocOffset, size);
137    }
138  }
139
140  /**
141   * Mostly a duplicate of {@link #copyCellInto(Cell, int)}} done for perf sake. It presumes
142   * ByteBufferExtendedCell instead of Cell so we deal with a specific type rather than the super
143   * generic Cell. Removes instanceof checks. Shrinkage is enough to make this inline where before
144   * it was too big. Uses less CPU. See HBASE-20875 for evidence.
145   * @see #copyCellInto(Cell, int)
146   */
147  private ExtendedCell copyBBECellInto(ByteBufferExtendedCell cell, int maxAlloc) {
148    int size = cell.getSerializedSize();
149    Preconditions.checkArgument(size >= 0, "negative size");
150    // Callers should satisfy large allocations from JVM heap so limit fragmentation.
151    if (size > maxAlloc) {
152      return null;
153    }
154    Chunk c = null;
155    int allocOffset = 0;
156    while (true) {
157      // Try to get the chunk
158      c = getOrMakeChunk();
159      // We may get null because the some other thread succeeded in getting the lock
160      // and so the current thread has to try again to make its chunk or grab the chunk
161      // that the other thread created
162      // Try to allocate from this chunk
163      if (c != null) {
164        allocOffset = c.alloc(size);
165        if (allocOffset != -1) {
166          // We succeeded - this is the common case - small alloc
167          // from a big buffer
168          break;
169        }
170        // not enough space!
171        // try to retire this chunk
172        tryRetireChunk(c);
173      }
174    }
175    return copyBBECToChunkCell(cell, c.getData(), allocOffset, size);
176  }
177
178  /**
179   * @see #copyBBECellInto(ByteBufferExtendedCell, int)
180   */
181  private ExtendedCell copyCellInto(ExtendedCell cell, int maxAlloc) {
182    int size = Segment.getCellLength(cell);
183    Preconditions.checkArgument(size >= 0, "negative size");
184    // Callers should satisfy large allocations directly from JVM since they
185    // don't cause fragmentation as badly.
186    if (size > maxAlloc) {
187      return null;
188    }
189    Chunk c = null;
190    int allocOffset = 0;
191    while (true) {
192      // Try to get the chunk
193      c = getOrMakeChunk();
194      // we may get null because the some other thread succeeded in getting the lock
195      // and so the current thread has to try again to make its chunk or grab the chunk
196      // that the other thread created
197      // Try to allocate from this chunk
198      if (c != null) {
199        allocOffset = c.alloc(size);
200        if (allocOffset != -1) {
201          // We succeeded - this is the common case - small alloc
202          // from a big buffer
203          break;
204        }
205        // not enough space!
206        // try to retire this chunk
207        tryRetireChunk(c);
208      }
209    }
210    return copyToChunkCell(cell, c.getData(), allocOffset, size);
211  }
212
213  /**
214   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
215   * out of it
216   * @see #copyBBECToChunkCell(ByteBufferExtendedCell, ByteBuffer, int, int)
217   */
218  private static ExtendedCell copyToChunkCell(ExtendedCell cell, ByteBuffer buf, int offset,
219    int len) {
220    int tagsLen = cell.getTagsLength();
221    cell.write(buf, offset);
222    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
223  }
224
225  /**
226   * Clone the passed cell by copying its data into the passed buf and create a cell with a chunkid
227   * out of it
228   * @see #copyToChunkCell(Cell, ByteBuffer, int, int)
229   */
230  private static ExtendedCell copyBBECToChunkCell(ByteBufferExtendedCell cell, ByteBuffer buf,
231    int offset, int len) {
232    int tagsLen = cell.getTagsLength();
233    cell.write(buf, offset);
234    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
235  }
236
237  private static ExtendedCell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
238    long sequenceId) {
239    // TODO : write the seqid here. For writing seqId we should create a new cell type so
240    // that seqId is not used as the state
241    if (tagsLen == 0) {
242      // When tagsLen is 0, make a NoTagsByteBufferKeyValue version. This is an optimized class
243      // which directly return tagsLen as 0. So we avoid parsing many length components in
244      // reading the tagLength stored in the backing buffer. The Memstore addition of every Cell
245      // call getTagsLength().
246      return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
247    } else {
248      return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
249    }
250  }
251
252  /**
253   * Close this instance since it won't be used any more, try to put the chunks back to pool
254   */
255  @Override
256  public void close() {
257    if (!this.closed.compareAndSet(false, true)) {
258      return;
259    }
260    // We could put back the chunks to pool for reusing only when there is no
261    // opening scanner which will read their data
262    this.refCnt.release();
263  }
264
265  @RestrictedApi(explanation = "Should only be called in tests", link = "",
266      allowedOnPath = ".*/src/test/.*")
267  int getRefCntValue() {
268    return this.refCnt.refCnt();
269  }
270
271  /**
272   * Called when opening a scanner on the data of this MemStoreLAB
273   */
274  @Override
275  public void incScannerCount() {
276    this.refCnt.retain();
277  }
278
279  /**
280   * Called when closing a scanner on the data of this MemStoreLAB
281   */
282  @Override
283  public void decScannerCount() {
284    this.refCnt.release();
285  }
286
287  private void recycleChunks() {
288    if (reclaimed.compareAndSet(false, true)) {
289      chunkCreator.putbackChunks(chunks);
290      chunks.clear();
291    }
292  }
293
294  /**
295   * Try to retire the current chunk if it is still <code>c</code>. Postcondition is that
296   * curChunk.get() != c
297   * @param c the chunk to retire
298   */
299  private void tryRetireChunk(Chunk c) {
300    currChunk.compareAndSet(c, null);
301    // If the CAS succeeds, that means that we won the race
302    // to retire the chunk. We could use this opportunity to
303    // update metrics on external fragmentation.
304    //
305    // If the CAS fails, that means that someone else already
306    // retired the chunk for us.
307  }
308
309  /**
310   * Get the current chunk, or, if there is no current chunk, allocate a new one from the JVM.
311   */
312  private Chunk getOrMakeChunk() {
313    // Try to get the chunk
314    Chunk c = currChunk.get();
315    if (c != null) {
316      return c;
317    }
318    // No current chunk, so we want to allocate one. We race
319    // against other allocators to CAS in an uninitialized chunk
320    // (which is cheap to allocate)
321    if (lock.tryLock()) {
322      try {
323        // once again check inside the lock
324        c = currChunk.get();
325        if (c != null) {
326          return c;
327        }
328        c = this.chunkCreator.getChunk();
329        if (c != null) {
330          // set the curChunk. No need of CAS as only one thread will be here
331          currChunk.set(c);
332          chunks.add(c.getId());
333          return c;
334        }
335      } finally {
336        lock.unlock();
337      }
338    }
339    return null;
340  }
341
342  /*
343   * Returning a new pool chunk, without replacing current chunk, meaning MSLABImpl does not make
344   * the returned chunk as CurChunk. The space on this chunk will be allocated externally. The
345   * interface is only for external callers.
346   */
347  @Override
348  public Chunk getNewExternalChunk(ChunkCreator.ChunkType chunkType) {
349    switch (chunkType) {
350      case INDEX_CHUNK:
351      case DATA_CHUNK:
352        Chunk c = this.chunkCreator.getChunk(chunkType);
353        chunks.add(c.getId());
354        return c;
355      case JUMBO_CHUNK: // a jumbo chunk doesn't have a fixed size
356      default:
357        return null;
358    }
359  }
360
361  /*
362   * Returning a new chunk, without replacing current chunk, meaning MSLABImpl does not make the
363   * returned chunk as CurChunk. The space on this chunk will be allocated externally. The interface
364   * is only for external callers. Chunks from pools are not allocated from here, since they have
365   * fixed sizes
366   */
367  @Override
368  public Chunk getNewExternalChunk(int size) {
369    int allocSize = size + ChunkCreator.SIZEOF_CHUNK_HEADER;
370    if (allocSize <= ChunkCreator.getInstance().getChunkSize()) {
371      return getNewExternalChunk(ChunkCreator.ChunkType.DATA_CHUNK);
372    } else {
373      Chunk c = this.chunkCreator.getJumboChunk(size);
374      chunks.add(c.getId());
375      return c;
376    }
377  }
378
379  @Override
380  public boolean isOnHeap() {
381    return !isOffHeap();
382  }
383
384  @Override
385  public boolean isOffHeap() {
386    return this.chunkCreator.isOffheap();
387  }
388
389  Chunk getCurrentChunk() {
390    return currChunk.get();
391  }
392
393  BlockingQueue<Chunk> getPooledChunks() {
394    BlockingQueue<Chunk> pooledChunks = new LinkedBlockingQueue<>();
395    for (Integer id : this.chunks) {
396      Chunk chunk = chunkCreator.getChunk(id);
397      if (chunk != null && chunk.isFromPool()) {
398        pooledChunks.add(chunk);
399      }
400    }
401    return pooledChunks;
402  }
403
404  Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) {
405    int i = 0;
406    for (Integer id : chunksId) {
407      if (chunkCreator.isChunkInPool(id)) {
408        i++;
409      }
410    }
411    return i;
412  }
413
414  boolean isReclaimed() {
415    return reclaimed.get();
416  }
417
418  boolean isClosed() {
419    return closed.get();
420  }
421}