001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.CHUNK_SIZE_KEY; 021import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.MAX_ALLOC_KEY; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.lang.management.ManagementFactory; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.ThreadLocalRandom; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ByteBufferKeyValue; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.ExtendedCell; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseConfiguration; 042import org.apache.hadoop.hbase.KeyValue; 043import org.apache.hadoop.hbase.MultithreadedTestUtil; 044import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 045import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 046import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 058import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 059import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 060import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 061 062@Category({ RegionServerTests.class, MediumTests.class }) 063public class TestMemStoreLAB { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestMemStoreLAB.class); 068 069 private final static Configuration conf = new Configuration(); 070 071 private static final byte[] rk = Bytes.toBytes("r1"); 072 private static final byte[] cf = Bytes.toBytes("f"); 073 private static final byte[] q = Bytes.toBytes("q"); 074 075 @BeforeClass 076 public static void setUpBeforeClass() throws Exception { 077 ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f, 078 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 079 } 080 081 @AfterClass 082 public static void tearDownAfterClass() throws Exception { 083 long globalMemStoreLimit = 084 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 085 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 086 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f, 087 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 088 } 089 090 /** 091 * Test a bunch of random allocations 092 */ 093 @Test 094 public void testLABRandomAllocation() { 095 MemStoreLAB mslab = new MemStoreLABImpl(); 096 int expectedOff = 0; 097 ByteBuffer lastBuffer = null; 098 int lastChunkId = -1; 099 // 100K iterations by 0-1K alloc -> 50MB expected 100 // should be reasonable for unit test and also cover wraparound 101 // behavior 102 Random rand = ThreadLocalRandom.current(); 103 for (int i = 0; i < 100000; i++) { 104 int valSize = rand.nextInt(3); 105 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 106 int size = kv.getSerializedSize(); 107 ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); 108 if (newKv.getBuffer() != lastBuffer) { 109 // since we add the chunkID at the 0th offset of the chunk and the 110 // chunkid is an int we need to account for those 4 bytes 111 expectedOff = Bytes.SIZEOF_INT; 112 lastBuffer = newKv.getBuffer(); 113 int chunkId = newKv.getBuffer().getInt(0); 114 assertTrue("chunkid should be different", chunkId != lastChunkId); 115 lastChunkId = chunkId; 116 } 117 assertEquals(expectedOff, newKv.getOffset()); 118 assertTrue("Allocation overruns buffer", 119 newKv.getOffset() + size <= newKv.getBuffer().capacity()); 120 expectedOff += size; 121 } 122 } 123 124 @Test 125 public void testLABLargeAllocation() { 126 MemStoreLAB mslab = new MemStoreLABImpl(); 127 KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); 128 Cell newCell = mslab.copyCellInto(kv); 129 assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); 130 } 131 132 /** 133 * Test allocation from lots of threads, making sure the results don't overlap in any way 134 */ 135 @Test 136 public void testLABThreading() throws Exception { 137 Configuration conf = new Configuration(); 138 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 139 140 final AtomicInteger totalAllocated = new AtomicInteger(); 141 142 final MemStoreLAB mslab = new MemStoreLABImpl(); 143 List<List<AllocRecord>> allocations = Lists.newArrayList(); 144 145 for (int i = 0; i < 10; i++) { 146 final List<AllocRecord> allocsByThisThread = Lists.newLinkedList(); 147 allocations.add(allocsByThisThread); 148 149 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 150 @Override 151 public void doAnAction() throws Exception { 152 int valSize = ThreadLocalRandom.current().nextInt(3); 153 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 154 int size = kv.getSerializedSize(); 155 ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv); 156 totalAllocated.addAndGet(size); 157 allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size)); 158 } 159 }; 160 ctx.addThread(t); 161 } 162 163 ctx.startThreads(); 164 while (totalAllocated.get() < 50 * 1024 * 1000 && ctx.shouldRun()) { 165 Thread.sleep(10); 166 } 167 ctx.stop(); 168 // Partition the allocations by the actual byte[] they point into, 169 // make sure offsets are unique for each chunk 170 Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = Maps.newHashMap(); 171 172 int sizeCounted = 0; 173 for (AllocRecord rec : Iterables.concat(allocations)) { 174 sizeCounted += rec.size; 175 if (rec.size == 0) { 176 continue; 177 } 178 Map<Integer, AllocRecord> mapForThisByteArray = mapsByChunk.get(rec.alloc); 179 if (mapForThisByteArray == null) { 180 mapForThisByteArray = Maps.newTreeMap(); 181 mapsByChunk.put(rec.alloc, mapForThisByteArray); 182 } 183 AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); 184 assertNull("Already had an entry " + oldVal + " for allocation " + rec, oldVal); 185 } 186 assertEquals("Sanity check test", sizeCounted, totalAllocated.get()); 187 188 // Now check each byte array to make sure allocations don't overlap 189 for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) { 190 // since we add the chunkID at the 0th offset of the chunk and the 191 // chunkid is an int we need to account for those 4 bytes 192 int expectedOff = Bytes.SIZEOF_INT; 193 for (AllocRecord alloc : allocsInChunk.values()) { 194 assertEquals(expectedOff, alloc.offset); 195 assertTrue("Allocation overruns buffer", 196 alloc.offset + alloc.size <= alloc.alloc.capacity()); 197 expectedOff += alloc.size; 198 } 199 } 200 } 201 202 /** 203 * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure 204 * there's no memory leak (HBASE-16195) 205 * @throws Exception if any error occurred 206 */ 207 @Test 208 public void testLABChunkQueue() throws Exception { 209 ChunkCreator oldInstance = null; 210 try { 211 MemStoreLABImpl mslab = new MemStoreLABImpl(); 212 // by default setting, there should be no chunks initialized in the pool 213 assertTrue(mslab.getPooledChunks().isEmpty()); 214 oldInstance = ChunkCreator.instance; 215 ChunkCreator.instance = null; 216 // reset mslab with chunk pool 217 Configuration conf = HBaseConfiguration.create(); 218 conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); 219 // set chunk size to default max alloc size, so we could easily trigger chunk retirement 220 conf.setLong(CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); 221 // reconstruct mslab 222 long globalMemStoreLimit = 223 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 224 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 225 ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, globalMemStoreLimit, 0.1f, 226 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 227 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 228 ChunkCreator.clearDisableFlag(); 229 mslab = new MemStoreLABImpl(conf); 230 // launch multiple threads to trigger frequent chunk retirement 231 List<Thread> threads = new ArrayList<>(); 232 final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), 233 new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); 234 for (int i = 0; i < 10; i++) { 235 threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); 236 } 237 for (Thread thread : threads) { 238 thread.start(); 239 } 240 // let it run for some time 241 Thread.sleep(1000); 242 for (Thread thread : threads) { 243 thread.interrupt(); 244 } 245 boolean threadsRunning = true; 246 boolean alive = false; 247 while (threadsRunning) { 248 alive = false; 249 for (Thread thread : threads) { 250 if (thread.isAlive()) { 251 alive = true; 252 break; 253 } 254 } 255 if (!alive) { 256 threadsRunning = false; 257 } 258 } 259 // none of the chunkIds would have been returned back 260 assertTrue("All the chunks must have been cleared", 261 ChunkCreator.instance.numberOfMappedChunks() != 0); 262 Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks); 263 int pooledChunksNum = mslab.getPooledChunks().size(); 264 // close the mslab 265 mslab.close(); 266 // make sure all chunks where reclaimed back to pool 267 int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds); 268 assertTrue( 269 "All chunks in chunk queue should be reclaimed or removed" 270 + " after mslab closed but actually: " + (pooledChunksNum - queueLength), 271 pooledChunksNum - queueLength == 0); 272 } finally { 273 ChunkCreator.instance = oldInstance; 274 } 275 } 276 277 /** 278 * Test cell with right length, which constructed by testForceCopyOfBigCellInto. (HBASE-26467) 279 */ 280 @Test 281 public void testForceCopyOfBigCellInto() { 282 Configuration conf = HBaseConfiguration.create(); 283 int chunkSize = ChunkCreator.getInstance().getChunkSize(); 284 conf.setInt(CHUNK_SIZE_KEY, chunkSize); 285 conf.setInt(MAX_ALLOC_KEY, chunkSize / 2); 286 287 MemStoreLABImpl mslab = new MemStoreLABImpl(conf); 288 byte[] row = Bytes.toBytes("row"); 289 byte[] columnFamily = Bytes.toBytes("columnFamily"); 290 byte[] qualify = Bytes.toBytes("qualify"); 291 byte[] smallValue = new byte[chunkSize / 2]; 292 byte[] bigValue = new byte[chunkSize]; 293 KeyValue smallKV = 294 new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), smallValue); 295 296 assertEquals(smallKV.getSerializedSize(), 297 mslab.forceCopyOfBigCellInto(smallKV).getSerializedSize()); 298 299 KeyValue bigKV = 300 new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), bigValue); 301 assertEquals(bigKV.getSerializedSize(), 302 mslab.forceCopyOfBigCellInto(bigKV).getSerializedSize()); 303 304 /** 305 * Add test by HBASE-26576,all the chunks are in {@link ChunkCreator#chunkIdMap} 306 */ 307 assertTrue(mslab.chunks.size() == 2); 308 Chunk dataChunk = null; 309 Chunk jumboChunk = null; 310 311 for (Integer chunkId : mslab.chunks) { 312 Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId); 313 assertTrue(chunk != null); 314 if (chunk.getChunkType() == ChunkType.JUMBO_CHUNK) { 315 jumboChunk = chunk; 316 } else if (chunk.getChunkType() == ChunkType.DATA_CHUNK) { 317 dataChunk = chunk; 318 } 319 } 320 321 assertTrue(dataChunk != null); 322 assertTrue(jumboChunk != null); 323 324 mslab.close(); 325 /** 326 * After mslab close, jumboChunk is removed from {@link ChunkCreator#chunkIdMap} but because 327 * dataChunk is recycled to pool so it is still in {@link ChunkCreator#chunkIdMap}. 328 */ 329 assertTrue(ChunkCreator.getInstance().getChunk(jumboChunk.getId()) == null); 330 assertTrue(!ChunkCreator.getInstance().isChunkInPool(jumboChunk.getId())); 331 assertTrue(ChunkCreator.getInstance().getChunk(dataChunk.getId()) == dataChunk); 332 assertTrue(ChunkCreator.getInstance().isChunkInPool(dataChunk.getId())); 333 334 } 335 336 private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, 337 ExtendedCell cellToCopyInto) { 338 Thread thread = new Thread() { 339 volatile boolean stopped = false; 340 341 @Override 342 public void run() { 343 while (!stopped) { 344 // keep triggering chunk retirement 345 mslab.copyCellInto(cellToCopyInto); 346 } 347 } 348 349 @Override 350 public void interrupt() { 351 this.stopped = true; 352 } 353 }; 354 thread.setName(threadName); 355 thread.setDaemon(true); 356 return thread; 357 } 358 359 private static class AllocRecord implements Comparable<AllocRecord> { 360 private final ByteBuffer alloc; 361 private final int offset; 362 private final int size; 363 364 public AllocRecord(ByteBuffer alloc, int offset, int size) { 365 super(); 366 this.alloc = alloc; 367 this.offset = offset; 368 this.size = size; 369 } 370 371 @Override 372 public int compareTo(AllocRecord e) { 373 if (alloc != e.alloc) { 374 throw new RuntimeException("Can only compare within a particular array"); 375 } 376 return Ints.compare(this.offset, e.offset); 377 } 378 379 @Override 380 public String toString() { 381 return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; 382 } 383 } 384}