001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Arrays; 030import java.util.HashSet; 031import java.util.Random; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.MultithreadedTestUtil; 039import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 040import org.apache.hadoop.hbase.io.ByteBuffAllocator; 041import org.apache.hadoop.hbase.io.HeapSize; 042import org.apache.hadoop.hbase.io.compress.Compression; 043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 044import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; 045import org.apache.hadoop.hbase.nio.ByteBuff; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.ChecksumType; 048 049public class CacheTestUtils { 050 051 private static final boolean includesMemstoreTS = true; 052 053 /** 054 * Just checks if heapsize grows when something is cached, and gets smaller when the same object 055 * is evicted 056 */ 057 058 public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize) { 059 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1); 060 long heapSize = ((HeapSize) toBeTested).heapSize(); 061 toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block); 062 063 /* When we cache something HeapSize should always increase */ 064 assertTrue(heapSize < ((HeapSize) toBeTested).heapSize()); 065 066 toBeTested.evictBlock(blocks[0].blockName); 067 068 /* Post eviction, heapsize should be the same */ 069 assertEquals(heapSize, ((HeapSize) toBeTested).heapSize()); 070 } 071 072 public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize, 073 final int numThreads, final int numQueries, final double passingScore) throws Exception { 074 075 Configuration conf = new Configuration(); 076 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 077 078 final AtomicInteger totalQueries = new AtomicInteger(); 079 final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>(); 080 final AtomicInteger hits = new AtomicInteger(); 081 final AtomicInteger miss = new AtomicInteger(); 082 083 HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize); 084 blocksToTest.addAll(Arrays.asList(blocks)); 085 086 for (int i = 0; i < numThreads; i++) { 087 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 088 @Override 089 public void doAnAction() throws Exception { 090 if (!blocksToTest.isEmpty()) { 091 HFileBlockPair ourBlock = blocksToTest.poll(); 092 // if we run out of blocks to test, then we should stop the tests. 093 if (ourBlock == null) { 094 ctx.setStopFlag(true); 095 return; 096 } 097 toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block); 098 Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true); 099 if (retrievedBlock != null) { 100 assertEquals(ourBlock.block, retrievedBlock); 101 toBeTested.evictBlock(ourBlock.blockName); 102 hits.incrementAndGet(); 103 assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true)); 104 } else { 105 miss.incrementAndGet(); 106 } 107 totalQueries.incrementAndGet(); 108 } 109 } 110 }; 111 t.setDaemon(true); 112 ctx.addThread(t); 113 } 114 ctx.startThreads(); 115 while (!blocksToTest.isEmpty() && ctx.shouldRun()) { 116 Thread.sleep(10); 117 } 118 ctx.stop(); 119 if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) { 120 fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get()); 121 } 122 } 123 124 public static void testCacheSimple(BlockCache toBeTested, int blockSize, int numBlocks) 125 throws Exception { 126 127 HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks); 128 // Confirm empty 129 for (HFileBlockPair block : blocks) { 130 assertNull(toBeTested.getBlock(block.blockName, true, false, true)); 131 } 132 133 // Add blocks 134 for (HFileBlockPair block : blocks) { 135 toBeTested.cacheBlock(block.blockName, block.block); 136 } 137 138 // Check if all blocks are properly cached and contain the right 139 // information, or the blocks are null. 140 // MapMaker makes no guarantees when it will evict, so neither can we. 141 142 for (HFileBlockPair block : blocks) { 143 HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true); 144 if (buf != null) { 145 assertEquals(block.block, buf); 146 } 147 } 148 149 // Re-add some duplicate blocks. Hope nothing breaks. 150 151 for (HFileBlockPair block : blocks) { 152 try { 153 if (toBeTested.getBlock(block.blockName, true, false, true) != null) { 154 toBeTested.cacheBlock(block.blockName, block.block); 155 if (!(toBeTested instanceof BucketCache)) { 156 // BucketCache won't throw exception when caching already cached 157 // block 158 fail("Cache should not allow re-caching a block"); 159 } 160 } 161 } catch (RuntimeException re) { 162 // expected 163 } 164 } 165 166 testConvertToJSON(toBeTested); 167 168 } 169 170 public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries) 171 throws Exception { 172 final BlockCacheKey key = new BlockCacheKey("key", 0); 173 final byte[] buf = new byte[5 * 1024]; 174 Arrays.fill(buf, (byte) 5); 175 176 final ByteArrayCacheable bac = new ByteArrayCacheable(buf); 177 Configuration conf = new Configuration(); 178 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 179 180 final AtomicInteger totalQueries = new AtomicInteger(); 181 toBeTested.cacheBlock(key, bac); 182 183 for (int i = 0; i < numThreads; i++) { 184 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 185 @Override 186 public void doAnAction() throws Exception { 187 ByteArrayCacheable returned = 188 (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true); 189 if (returned != null) { 190 assertArrayEquals(buf, returned.buf); 191 } else { 192 Thread.sleep(10); 193 } 194 totalQueries.incrementAndGet(); 195 } 196 }; 197 198 t.setDaemon(true); 199 ctx.addThread(t); 200 } 201 202 // add a thread to periodically evict and re-cache the block 203 final long blockEvictPeriod = 50; 204 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 205 @Override 206 public void doAnAction() throws Exception { 207 toBeTested.evictBlock(key); 208 toBeTested.cacheBlock(key, bac); 209 Thread.sleep(blockEvictPeriod); 210 } 211 }; 212 t.setDaemon(true); 213 ctx.addThread(t); 214 215 ctx.startThreads(); 216 while (totalQueries.get() < numQueries && ctx.shouldRun()) { 217 Thread.sleep(10); 218 } 219 ctx.stop(); 220 } 221 222 public static class ByteArrayCacheable implements Cacheable { 223 224 static final CacheableDeserializer<Cacheable> blockDeserializer = 225 new CacheableDeserializer<Cacheable>() { 226 @Override 227 public int getDeserializerIdentifier() { 228 return deserializerIdentifier; 229 } 230 231 @Override 232 public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { 233 int len = b.getInt(); 234 Thread.yield(); 235 byte buf[] = new byte[len]; 236 b.get(buf); 237 return new ByteArrayCacheable(buf); 238 } 239 }; 240 241 final byte[] buf; 242 243 public ByteArrayCacheable(byte[] buf) { 244 this.buf = buf; 245 } 246 247 @Override 248 public long heapSize() { 249 return 4L + buf.length; 250 } 251 252 @Override 253 public int getSerializedLength() { 254 return 4 + buf.length; 255 } 256 257 @Override 258 public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) { 259 destination.putInt(buf.length); 260 Thread.yield(); 261 destination.put(buf); 262 destination.rewind(); 263 } 264 265 @Override 266 public CacheableDeserializer<Cacheable> getDeserializer() { 267 return blockDeserializer; 268 } 269 270 private static final int deserializerIdentifier; 271 static { 272 deserializerIdentifier = 273 CacheableDeserializerIdManager.registerDeserializer(blockDeserializer); 274 } 275 276 @Override 277 public BlockType getBlockType() { 278 return BlockType.DATA; 279 } 280 } 281 282 public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { 283 return generateBlocksForPath(blockSize, numBlocks, null); 284 } 285 286 public static HFileBlockPair[] generateBlocksForPath(int blockSize, int numBlocks, Path path, 287 boolean encoded) { 288 HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; 289 Random rand = ThreadLocalRandom.current(); 290 HashSet<String> usedStrings = new HashSet<>(); 291 for (int i = 0; i < numBlocks; i++) { 292 ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize); 293 Bytes.random(cachedBuffer.array()); 294 cachedBuffer.rewind(); 295 int onDiskSizeWithoutHeader = blockSize; 296 int uncompressedSizeWithoutHeader = blockSize; 297 long prevBlockOffset = rand.nextLong(); 298 BlockType.DATA.write(cachedBuffer); 299 cachedBuffer.putInt(onDiskSizeWithoutHeader); 300 cachedBuffer.putInt(uncompressedSizeWithoutHeader); 301 cachedBuffer.putLong(prevBlockOffset); 302 cachedBuffer.rewind(); 303 HFileContext meta = 304 new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS) 305 .withIncludesTags(false).withCompression(Compression.Algorithm.NONE) 306 .withDataBlockEncoding(DataBlockEncoding.FAST_DIFF).withBytesPerCheckSum(0) 307 .withChecksumType(ChecksumType.NULL).build(); 308 HFileBlock generated = new HFileBlock(encoded ? BlockType.ENCODED_DATA : BlockType.DATA, 309 onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, 310 ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize, 311 onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta, 312 ByteBuffAllocator.HEAP); 313 String key = null; 314 long offset = 0; 315 if (path != null) { 316 key = path.getName(); 317 offset = i * blockSize; 318 } else { 319 /* No conflicting keys */ 320 key = Long.toString(rand.nextLong()); 321 while (!usedStrings.add(key)) { 322 key = Long.toString(rand.nextLong()); 323 } 324 } 325 returnedBlocks[i] = new HFileBlockPair(); 326 returnedBlocks[i].blockName = 327 new BlockCacheKey(key, offset, true, encoded ? BlockType.ENCODED_DATA : BlockType.DATA); 328 returnedBlocks[i].block = generated; 329 } 330 return returnedBlocks; 331 } 332 333 public static HFileBlockPair[] generateBlocksForPath(int blockSize, int numBlocks, Path path) { 334 return generateBlocksForPath(blockSize, numBlocks, path, false); 335 } 336 337 public static class HFileBlockPair { 338 BlockCacheKey blockName; 339 HFileBlock block; 340 341 public BlockCacheKey getBlockName() { 342 return this.blockName; 343 } 344 345 public HFileBlock getBlock() { 346 return this.block; 347 } 348 } 349 350 public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key, 351 Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) { 352 destBuffer.clear(); 353 cache.cacheBlock(key, blockToCache); 354 Cacheable actualBlock = cache.getBlock(key, false, false, false); 355 try { 356 actualBlock.serialize(destBuffer, true); 357 assertEquals(expectedBuffer, destBuffer); 358 } finally { 359 // Release the reference count increased by getBlock. 360 if (actualBlock != null) { 361 actualBlock.release(); 362 } 363 } 364 } 365 366 public static void testConvertToJSON(BlockCache toBeTested) { 367 try { 368 String json = BlockCacheUtil.toJSON(toBeTested); 369 assertNotNull(json); 370 } catch (Exception e) { 371 fail("conversion to JSON should not fail, exception is:" + e); 372 } 373 } 374}