001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.Arrays; 029import java.util.HashSet; 030import java.util.Random; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicInteger; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.MultithreadedTestUtil; 038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 039import org.apache.hadoop.hbase.io.ByteBuffAllocator; 040import org.apache.hadoop.hbase.io.HeapSize; 041import org.apache.hadoop.hbase.io.compress.Compression; 042import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 043import org.apache.hadoop.hbase.nio.ByteBuff; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.ChecksumType; 046 047public class CacheTestUtils { 048 049 private static final boolean includesMemstoreTS = true; 050 051 /** 052 * Just checks if heapsize grows when something is cached, and gets smaller when the same object 053 * is evicted 054 */ 055 056 public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize) { 057 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); 058 long heapSize = ((HeapSize) toBeTested).heapSize(); 059 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); 060 061 /* When we cache something HeapSize should always increase */ 062 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); 063 064 toBeTested.evictBlock(blocks[0].blockName); 065 066 /* Post eviction, heapsize should be the same */ 067 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); 068 } 069 070 public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, 071 final int numThreads, final int numQueries, final double passingScore) throws Exception { 072 073 Configuration conf = new Configuration(); 074 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 075 076 final AtomicInteger totalQueries = new AtomicInteger(); 077 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); 078 final AtomicInteger hits = new AtomicInteger(); 079 final AtomicInteger miss = new AtomicInteger(); 080 081 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); 082 blocksToTest.addAll(Arrays.asList(blocks)); 083 084 for (int i = 0; i < numThreads; i++) { 085 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 086 @Override 087 public void doAnAction() throws Exception { 088 if (!blocksToTest.isEmpty()) { 089 HFileBlockPair ourBlock = blocksToTest.poll(); 090 // if we run out of blocks to test, then we should stop the tests. 091 if (ourBlock == null) { 092 ctx.setStopFlag(true); 093 return; 094 } 095 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); 096 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true); 097 if (retrievedBlock != null) { 098 assertEquals(ourBlock.block, retrievedBlock); 099 toBeTested.evictBlock(ourBlock.blockName); 100 hits.incrementAndGet(); 101 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); 102 } else { 103 miss.incrementAndGet(); 104 } 105 totalQueries.incrementAndGet(); 106 } 107 } 108 }; 109 t.setDaemon(true); 110 ctx.addThread(t); 111 } 112 ctx.startThreads(); 113 while (!blocksToTest.isEmpty() && ctx.shouldRun()) { 114 Thread.sleep(10); 115 } 116 ctx.stop(); 117 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { 118 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); 119 } 120 } 121 122 public static void testCacheSimple(BlockCache toBeTested, int blockSize, int numBlocks) 123 throws Exception { 124 125 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); 126 // Confirm empty 127 for (HFileBlockPair block : blocks) { 128 assertNull(toBeTested.getBlock(block.blockName, true, false, true)); 129 } 130 131 // Add blocks 132 for (HFileBlockPair block : blocks) { 133 toBeTested.cacheBlock(block.blockName, block.block); 134 } 135 136 // Check if all blocks are properly cached and contain the right 137 // information, or the blocks are null. 138 // MapMaker makes no guarantees when it will evict, so neither can we. 139 140 for (HFileBlockPair block : blocks) { 141 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); 142 if (buf != null) { 143 assertEquals(block.block, buf); 144 } 145 } 146 147 // Re-add some duplicate blocks. Hope nothing breaks. 148 149 for (HFileBlockPair block : blocks) { 150 try { 151 if (toBeTested.getBlock(block.blockName, true, false, true) != null) { 152 toBeTested.cacheBlock(block.blockName, block.block); 153 if (!(toBeTested instanceof BucketCache)) { 154 // BucketCache won't throw exception when caching already cached 155 // block 156 fail("Cache should not allow re-caching a block"); 157 } 158 } 159 } catch (RuntimeException re) { 160 // expected 161 } 162 } 163 164 } 165 166 public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries) 167 throws Exception { 168 final BlockCacheKey key = new BlockCacheKey("key", 0); 169 final byte[] buf = new byte[5 * 1024]; 170 Arrays.fill(buf, (byte) 5); 171 172 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 173 Configuration conf = new Configuration(); 174 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 175 176 final AtomicInteger totalQueries = new AtomicInteger(); 177 toBeTested.cacheBlock(key, bac); 178 179 for (int i = 0; i < numThreads; i++) { 180 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 181 @Override 182 public void doAnAction() throws Exception { 183 ByteArrayCacheable returned = 184 (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true); 185 if (returned != null) { 186 assertArrayEquals(buf, returned.buf); 187 } else { 188 Thread.sleep(10); 189 } 190 totalQueries.incrementAndGet(); 191 } 192 }; 193 194 t.setDaemon(true); 195 ctx.addThread(t); 196 } 197 198 // add a thread to periodically evict and re-cache the block 199 final long blockEvictPeriod = 50; 200 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 201 @Override 202 public void doAnAction() throws Exception { 203 toBeTested.evictBlock(key); 204 toBeTested.cacheBlock(key, bac); 205 Thread.sleep(blockEvictPeriod); 206 } 207 }; 208 t.setDaemon(true); 209 ctx.addThread(t); 210 211 ctx.startThreads(); 212 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 213 Thread.sleep(10); 214 } 215 ctx.stop(); 216 } 217 218 public static class ByteArrayCacheable implements Cacheable { 219 220 static final CacheableDeserializer<Cacheable> blockDeserializer = 221 new CacheableDeserializer<Cacheable>() { 222 @Override 223 public int getDeserializerIdentifier() { 224 return deserializerIdentifier; 225 } 226 227 @Override 228 public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { 229 int len = b.getInt(); 230 Thread.yield(); 231 byte buf[] = new byte[len]; 232 b.get(buf); 233 return new ByteArrayCacheable(buf); 234 } 235 }; 236 237 final byte[] buf; 238 239 public ByteArrayCacheable(byte[] buf) { 240 this.buf = buf; 241 } 242 243 @Override 244 public long heapSize() { 245 return 4L + buf.length; 246 } 247 248 @Override 249 public int getSerializedLength() { 250 return 4 + buf.length; 251 } 252 253 @Override 254 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 255 destination.putInt(buf.length); 256 Thread.yield(); 257 destination.put(buf); 258 destination.rewind(); 259 } 260 261 @Override 262 public CacheableDeserializer<Cacheable> getDeserializer() { 263 return blockDeserializer; 264 } 265 266 private static final int deserializerIdentifier; 267 static { 268 deserializerIdentifier = 269 CacheableDeserializerIdManager.registerDeserializer(blockDeserializer); 270 } 271 272 @Override 273 public BlockType getBlockType() { 274 return BlockType.DATA; 275 } 276 } 277 278 public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { 279 return generateBlocksForPath(blockSize, numBlocks, null); 280 } 281 282 public static HFileBlockPair[] generateBlocksForPath(int blockSize, int numBlocks, Path path) { 283 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; 284 Random rand = ThreadLocalRandom.current(); 285 HashSet<String> usedStrings = new HashSet<>(); 286 for (int i = 0; i < numBlocks; i++) { 287 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); 288 Bytes.random(cachedBuffer.array()); 289 cachedBuffer.rewind(); 290 int onDiskSizeWithoutHeader = blockSize; 291 int uncompressedSizeWithoutHeader = blockSize; 292 long prevBlockOffset = rand.nextLong(); 293 BlockType.DATA.write(cachedBuffer); 294 cachedBuffer.putInt(onDiskSizeWithoutHeader); 295 cachedBuffer.putInt(uncompressedSizeWithoutHeader); 296 cachedBuffer.putLong(prevBlockOffset); 297 cachedBuffer.rewind(); 298 HFileContext meta = 299 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 300 .withIncludesTags(false).withCompression(Compression.Algorithm.NONE) 301 .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL).build(); 302 HFileBlock generated = 303 new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, 304 prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize, 305 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta, 306 ByteBuffAllocator.HEAP); 307 String key = null; 308 long offset = 0; 309 if (path != null) { 310 key = path.getName(); 311 offset = i * blockSize; 312 } else { 313 /* No conflicting keys */ 314 key = Long.toString(rand.nextLong()); 315 while (!usedStrings.add(key)) { 316 key = Long.toString(rand.nextLong()); 317 } 318 } 319 returnedBlocks[i] = new HFileBlockPair(); 320 returnedBlocks[i].blockName = new BlockCacheKey(key, offset); 321 returnedBlocks[i].block = generated; 322 } 323 return returnedBlocks; 324 } 325 326 public static class HFileBlockPair { 327 BlockCacheKey blockName; 328 HFileBlock block; 329 330 public BlockCacheKey getBlockName() { 331 return this.blockName; 332 } 333 334 public HFileBlock getBlock() { 335 return this.block; 336 } 337 } 338 339 public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, 340 Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) { 341 destBuffer.clear(); 342 cache.cacheBlock(key, blockToCache); 343 Cacheable actualBlock = cache.getBlock(key, false, false, false); 344 try { 345 actualBlock.serialize(destBuffer, true); 346 assertEquals(expectedBuffer, destBuffer); 347 } finally { 348 // Release the reference count increased by getBlock. 349 if (actualBlock != null) { 350 actualBlock.release(); 351 } 352 } 353 } 354}