001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.CHUNK_SIZE_KEY;
021import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.MAX_ALLOC_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.lang.management.ManagementFactory;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.ThreadLocalRandom;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.ByteBufferKeyValue;
038import org.apache.hadoop.hbase.Cell;
039import org.apache.hadoop.hbase.ExtendedCell;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseConfiguration;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.MultithreadedTestUtil;
044import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
045import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
046import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.junit.AfterClass;
052import org.junit.BeforeClass;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
059import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
060import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
061
062@Category({ RegionServerTests.class, MediumTests.class })
063public class TestMemStoreLAB {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestMemStoreLAB.class);
068
069  private final static Configuration conf = new Configuration();
070
071  private static final byte[] rk = Bytes.toBytes("r1");
072  private static final byte[] cf = Bytes.toBytes("f");
073  private static final byte[] q = Bytes.toBytes("q");
074
075  @BeforeClass
076  public static void setUpBeforeClass() throws Exception {
077    ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
078      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
079  }
080
081  @AfterClass
082  public static void tearDownAfterClass() throws Exception {
083    long globalMemStoreLimit =
084      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
085        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
086    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
087      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
088  }
089
090  /**
091   * Test a bunch of random allocations
092   */
093  @Test
094  public void testLABRandomAllocation() {
095    MemStoreLAB mslab = new MemStoreLABImpl();
096    int expectedOff = 0;
097    ByteBuffer lastBuffer = null;
098    int lastChunkId = -1;
099    // 100K iterations by 0-1K alloc -> 50MB expected
100    // should be reasonable for unit test and also cover wraparound
101    // behavior
102    Random rand = ThreadLocalRandom.current();
103    for (int i = 0; i < 100000; i++) {
104      int valSize = rand.nextInt(3);
105      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
106      int size = kv.getSerializedSize();
107      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
108      if (newKv.getBuffer() != lastBuffer) {
109        // since we add the chunkID at the 0th offset of the chunk and the
110        // chunkid is an int we need to account for those 4 bytes
111        expectedOff = Bytes.SIZEOF_INT;
112        lastBuffer = newKv.getBuffer();
113        int chunkId = newKv.getBuffer().getInt(0);
114        assertTrue("chunkid should be different", chunkId != lastChunkId);
115        lastChunkId = chunkId;
116      }
117      assertEquals(expectedOff, newKv.getOffset());
118      assertTrue("Allocation overruns buffer",
119        newKv.getOffset() + size <= newKv.getBuffer().capacity());
120      expectedOff += size;
121    }
122  }
123
124  @Test
125  public void testLABLargeAllocation() {
126    MemStoreLAB mslab = new MemStoreLABImpl();
127    KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
128    Cell newCell = mslab.copyCellInto(kv);
129    assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
130  }
131
132  /**
133   * Test allocation from lots of threads, making sure the results don't overlap in any way
134   */
135  @Test
136  public void testLABThreading() throws Exception {
137    Configuration conf = new Configuration();
138    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
139
140    final AtomicInteger totalAllocated = new AtomicInteger();
141
142    final MemStoreLAB mslab = new MemStoreLABImpl();
143    List<List<AllocRecord>> allocations = Lists.newArrayList();
144
145    for (int i = 0; i < 10; i++) {
146      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
147      allocations.add(allocsByThisThread);
148
149      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
150        @Override
151        public void doAnAction() throws Exception {
152          int valSize = ThreadLocalRandom.current().nextInt(3);
153          KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
154          int size = kv.getSerializedSize();
155          ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
156          totalAllocated.addAndGet(size);
157          allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
158        }
159      };
160      ctx.addThread(t);
161    }
162
163    ctx.startThreads();
164    while (totalAllocated.get() < 50 * 1024 * 1000 && ctx.shouldRun()) {
165      Thread.sleep(10);
166    }
167    ctx.stop();
168    // Partition the allocations by the actual byte[] they point into,
169    // make sure offsets are unique for each chunk
170    Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = Maps.newHashMap();
171
172    int sizeCounted = 0;
173    for (AllocRecord rec : Iterables.concat(allocations)) {
174      sizeCounted += rec.size;
175      if (rec.size == 0) {
176        continue;
177      }
178      Map<Integer, AllocRecord> mapForThisByteArray = mapsByChunk.get(rec.alloc);
179      if (mapForThisByteArray == null) {
180        mapForThisByteArray = Maps.newTreeMap();
181        mapsByChunk.put(rec.alloc, mapForThisByteArray);
182      }
183      AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
184      assertNull("Already had an entry " + oldVal + " for allocation " + rec, oldVal);
185    }
186    assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
187
188    // Now check each byte array to make sure allocations don't overlap
189    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
190      // since we add the chunkID at the 0th offset of the chunk and the
191      // chunkid is an int we need to account for those 4 bytes
192      int expectedOff = Bytes.SIZEOF_INT;
193      for (AllocRecord alloc : allocsInChunk.values()) {
194        assertEquals(expectedOff, alloc.offset);
195        assertTrue("Allocation overruns buffer",
196          alloc.offset + alloc.size <= alloc.alloc.capacity());
197        expectedOff += alloc.size;
198      }
199    }
200  }
201
202  /**
203   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
204   * there's no memory leak (HBASE-16195)
205   * @throws Exception if any error occurred
206   */
207  @Test
208  public void testLABChunkQueue() throws Exception {
209    ChunkCreator oldInstance = null;
210    try {
211      MemStoreLABImpl mslab = new MemStoreLABImpl();
212      // by default setting, there should be no chunks initialized in the pool
213      assertTrue(mslab.getPooledChunks().isEmpty());
214      oldInstance = ChunkCreator.instance;
215      ChunkCreator.instance = null;
216      // reset mslab with chunk pool
217      Configuration conf = HBaseConfiguration.create();
218      conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
219      // set chunk size to default max alloc size, so we could easily trigger chunk retirement
220      conf.setLong(CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
221      // reconstruct mslab
222      long globalMemStoreLimit =
223        (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
224          * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
225      ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, globalMemStoreLimit, 0.1f,
226        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
227        MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
228      ChunkCreator.clearDisableFlag();
229      mslab = new MemStoreLABImpl(conf);
230      // launch multiple threads to trigger frequent chunk retirement
231      List<Thread> threads = new ArrayList<>();
232      final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
233        new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
234      for (int i = 0; i < 10; i++) {
235        threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
236      }
237      for (Thread thread : threads) {
238        thread.start();
239      }
240      // let it run for some time
241      Thread.sleep(1000);
242      for (Thread thread : threads) {
243        thread.interrupt();
244      }
245      boolean threadsRunning = true;
246      boolean alive = false;
247      while (threadsRunning) {
248        alive = false;
249        for (Thread thread : threads) {
250          if (thread.isAlive()) {
251            alive = true;
252            break;
253          }
254        }
255        if (!alive) {
256          threadsRunning = false;
257        }
258      }
259      // none of the chunkIds would have been returned back
260      assertTrue("All the chunks must have been cleared",
261        ChunkCreator.instance.numberOfMappedChunks() != 0);
262      Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks);
263      int pooledChunksNum = mslab.getPooledChunks().size();
264      // close the mslab
265      mslab.close();
266      // make sure all chunks where reclaimed back to pool
267      int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds);
268      assertTrue(
269        "All chunks in chunk queue should be reclaimed or removed"
270          + " after mslab closed but actually: " + (pooledChunksNum - queueLength),
271        pooledChunksNum - queueLength == 0);
272    } finally {
273      ChunkCreator.instance = oldInstance;
274    }
275  }
276
277  /**
278   * Test cell with right length, which constructed by testForceCopyOfBigCellInto. (HBASE-26467)
279   */
280  @Test
281  public void testForceCopyOfBigCellInto() {
282    Configuration conf = HBaseConfiguration.create();
283    int chunkSize = ChunkCreator.getInstance().getChunkSize();
284    conf.setInt(CHUNK_SIZE_KEY, chunkSize);
285    conf.setInt(MAX_ALLOC_KEY, chunkSize / 2);
286
287    MemStoreLABImpl mslab = new MemStoreLABImpl(conf);
288    byte[] row = Bytes.toBytes("row");
289    byte[] columnFamily = Bytes.toBytes("columnFamily");
290    byte[] qualify = Bytes.toBytes("qualify");
291    byte[] smallValue = new byte[chunkSize / 2];
292    byte[] bigValue = new byte[chunkSize];
293    KeyValue smallKV =
294      new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), smallValue);
295
296    assertEquals(smallKV.getSerializedSize(),
297      mslab.forceCopyOfBigCellInto(smallKV).getSerializedSize());
298
299    KeyValue bigKV =
300      new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), bigValue);
301    assertEquals(bigKV.getSerializedSize(),
302      mslab.forceCopyOfBigCellInto(bigKV).getSerializedSize());
303
304    /**
305     * Add test by HBASE-26576,all the chunks are in {@link ChunkCreator#chunkIdMap}
306     */
307    assertTrue(mslab.chunks.size() == 2);
308    Chunk dataChunk = null;
309    Chunk jumboChunk = null;
310
311    for (Integer chunkId : mslab.chunks) {
312      Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
313      assertTrue(chunk != null);
314      if (chunk.getChunkType() == ChunkType.JUMBO_CHUNK) {
315        jumboChunk = chunk;
316      } else if (chunk.getChunkType() == ChunkType.DATA_CHUNK) {
317        dataChunk = chunk;
318      }
319    }
320
321    assertTrue(dataChunk != null);
322    assertTrue(jumboChunk != null);
323
324    mslab.close();
325    /**
326     * After mslab close, jumboChunk is removed from {@link ChunkCreator#chunkIdMap} but because
327     * dataChunk is recycled to pool so it is still in {@link ChunkCreator#chunkIdMap}.
328     */
329    assertTrue(ChunkCreator.getInstance().getChunk(jumboChunk.getId()) == null);
330    assertTrue(!ChunkCreator.getInstance().isChunkInPool(jumboChunk.getId()));
331    assertTrue(ChunkCreator.getInstance().getChunk(dataChunk.getId()) == dataChunk);
332    assertTrue(ChunkCreator.getInstance().isChunkInPool(dataChunk.getId()));
333
334  }
335
336  private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
337    ExtendedCell cellToCopyInto) {
338    Thread thread = new Thread() {
339      volatile boolean stopped = false;
340
341      @Override
342      public void run() {
343        while (!stopped) {
344          // keep triggering chunk retirement
345          mslab.copyCellInto(cellToCopyInto);
346        }
347      }
348
349      @Override
350      public void interrupt() {
351        this.stopped = true;
352      }
353    };
354    thread.setName(threadName);
355    thread.setDaemon(true);
356    return thread;
357  }
358
359  private static class AllocRecord implements Comparable<AllocRecord> {
360    private final ByteBuffer alloc;
361    private final int offset;
362    private final int size;
363
364    public AllocRecord(ByteBuffer alloc, int offset, int size) {
365      super();
366      this.alloc = alloc;
367      this.offset = offset;
368      this.size = size;
369    }
370
371    @Override
372    public int compareTo(AllocRecord e) {
373      if (alloc != e.alloc) {
374        throw new RuntimeException("Can only compare within a particular array");
375      }
376      return Ints.compare(this.offset, e.offset);
377    }
378
379    @Override
380    public String toString() {
381      return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
382    }
383  }
384}