001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.Arrays;
029import java.util.HashSet;
030import java.util.Random;
031import java.util.concurrent.ConcurrentLinkedQueue;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.MultithreadedTestUtil;
038import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
039import org.apache.hadoop.hbase.io.ByteBuffAllocator;
040import org.apache.hadoop.hbase.io.HeapSize;
041import org.apache.hadoop.hbase.io.compress.Compression;
042import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
043import org.apache.hadoop.hbase.nio.ByteBuff;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.ChecksumType;
046
047public class CacheTestUtils {
048
049  private static final boolean includesMemstoreTS = true;
050
051  /**
052   * Just checks if heapsize grows when something is cached, and gets smaller when the same object
053   * is evicted
054   */
055
056  public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize) {
057    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
058    long heapSize = ((HeapSize) toBeTested).heapSize();
059    toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
060
061    /* When we cache something HeapSize should always increase */
062    assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
063
064    toBeTested.evictBlock(blocks[0].blockName);
065
066    /* Post eviction, heapsize should be the same */
067    assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
068  }
069
070  public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize,
071    final int numThreads, final int numQueries, final double passingScore) throws Exception {
072
073    Configuration conf = new Configuration();
074    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
075
076    final AtomicInteger totalQueries = new AtomicInteger();
077    final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
078    final AtomicInteger hits = new AtomicInteger();
079    final AtomicInteger miss = new AtomicInteger();
080
081    HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
082    blocksToTest.addAll(Arrays.asList(blocks));
083
084    for (int i = 0; i < numThreads; i++) {
085      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
086        @Override
087        public void doAnAction() throws Exception {
088          if (!blocksToTest.isEmpty()) {
089            HFileBlockPair ourBlock = blocksToTest.poll();
090            // if we run out of blocks to test, then we should stop the tests.
091            if (ourBlock == null) {
092              ctx.setStopFlag(true);
093              return;
094            }
095            toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
096            Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true);
097            if (retrievedBlock != null) {
098              assertEquals(ourBlock.block, retrievedBlock);
099              toBeTested.evictBlock(ourBlock.blockName);
100              hits.incrementAndGet();
101              assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
102            } else {
103              miss.incrementAndGet();
104            }
105            totalQueries.incrementAndGet();
106          }
107        }
108      };
109      t.setDaemon(true);
110      ctx.addThread(t);
111    }
112    ctx.startThreads();
113    while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
114      Thread.sleep(10);
115    }
116    ctx.stop();
117    if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
118      fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get());
119    }
120  }
121
122  public static void testCacheSimple(BlockCache toBeTested, int blockSize, int numBlocks)
123    throws Exception {
124
125    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
126    // Confirm empty
127    for (HFileBlockPair block : blocks) {
128      assertNull(toBeTested.getBlock(block.blockName, true, false, true));
129    }
130
131    // Add blocks
132    for (HFileBlockPair block : blocks) {
133      toBeTested.cacheBlock(block.blockName, block.block);
134    }
135
136    // Check if all blocks are properly cached and contain the right
137    // information, or the blocks are null.
138    // MapMaker makes no guarantees when it will evict, so neither can we.
139
140    for (HFileBlockPair block : blocks) {
141      HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
142      if (buf != null) {
143        assertEquals(block.block, buf);
144      }
145    }
146
147    // Re-add some duplicate blocks. Hope nothing breaks.
148
149    for (HFileBlockPair block : blocks) {
150      try {
151        if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
152          toBeTested.cacheBlock(block.blockName, block.block);
153          if (!(toBeTested instanceof BucketCache)) {
154            // BucketCache won't throw exception when caching already cached
155            // block
156            fail("Cache should not allow re-caching a block");
157          }
158        }
159      } catch (RuntimeException re) {
160        // expected
161      }
162    }
163
164  }
165
166  public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
167    throws Exception {
168    final BlockCacheKey key = new BlockCacheKey("key", 0);
169    final byte[] buf = new byte[5 * 1024];
170    Arrays.fill(buf, (byte) 5);
171
172    final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
173    Configuration conf = new Configuration();
174    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
175
176    final AtomicInteger totalQueries = new AtomicInteger();
177    toBeTested.cacheBlock(key, bac);
178
179    for (int i = 0; i < numThreads; i++) {
180      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
181        @Override
182        public void doAnAction() throws Exception {
183          ByteArrayCacheable returned =
184            (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
185          if (returned != null) {
186            assertArrayEquals(buf, returned.buf);
187          } else {
188            Thread.sleep(10);
189          }
190          totalQueries.incrementAndGet();
191        }
192      };
193
194      t.setDaemon(true);
195      ctx.addThread(t);
196    }
197
198    // add a thread to periodically evict and re-cache the block
199    final long blockEvictPeriod = 50;
200    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
201      @Override
202      public void doAnAction() throws Exception {
203        toBeTested.evictBlock(key);
204        toBeTested.cacheBlock(key, bac);
205        Thread.sleep(blockEvictPeriod);
206      }
207    };
208    t.setDaemon(true);
209    ctx.addThread(t);
210
211    ctx.startThreads();
212    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
213      Thread.sleep(10);
214    }
215    ctx.stop();
216  }
217
218  public static class ByteArrayCacheable implements Cacheable {
219
220    static final CacheableDeserializer<Cacheable> blockDeserializer =
221      new CacheableDeserializer<Cacheable>() {
222        @Override
223        public int getDeserializerIdentifier() {
224          return deserializerIdentifier;
225        }
226
227        @Override
228        public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
229          int len = b.getInt();
230          Thread.yield();
231          byte buf[] = new byte[len];
232          b.get(buf);
233          return new ByteArrayCacheable(buf);
234        }
235      };
236
237    final byte[] buf;
238
239    public ByteArrayCacheable(byte[] buf) {
240      this.buf = buf;
241    }
242
243    @Override
244    public long heapSize() {
245      return 4L + buf.length;
246    }
247
248    @Override
249    public int getSerializedLength() {
250      return 4 + buf.length;
251    }
252
253    @Override
254    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
255      destination.putInt(buf.length);
256      Thread.yield();
257      destination.put(buf);
258      destination.rewind();
259    }
260
261    @Override
262    public CacheableDeserializer<Cacheable> getDeserializer() {
263      return blockDeserializer;
264    }
265
266    private static final int deserializerIdentifier;
267    static {
268      deserializerIdentifier =
269        CacheableDeserializerIdManager.registerDeserializer(blockDeserializer);
270    }
271
272    @Override
273    public BlockType getBlockType() {
274      return BlockType.DATA;
275    }
276  }
277
278  public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
279    return generateBlocksForPath(blockSize, numBlocks, null);
280  }
281
282  public static HFileBlockPair[] generateBlocksForPath(int blockSize, int numBlocks, Path path) {
283    HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
284    Random rand = ThreadLocalRandom.current();
285    HashSet<String> usedStrings = new HashSet<>();
286    for (int i = 0; i < numBlocks; i++) {
287      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
288      Bytes.random(cachedBuffer.array());
289      cachedBuffer.rewind();
290      int onDiskSizeWithoutHeader = blockSize;
291      int uncompressedSizeWithoutHeader = blockSize;
292      long prevBlockOffset = rand.nextLong();
293      BlockType.DATA.write(cachedBuffer);
294      cachedBuffer.putInt(onDiskSizeWithoutHeader);
295      cachedBuffer.putInt(uncompressedSizeWithoutHeader);
296      cachedBuffer.putLong(prevBlockOffset);
297      cachedBuffer.rewind();
298      HFileContext meta =
299        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
300          .withIncludesTags(false).withCompression(Compression.Algorithm.NONE)
301          .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL).build();
302      HFileBlock generated =
303        new HFileBlock(BlockType.DATA, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
304          prevBlockOffset, ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
305          onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
306          ByteBuffAllocator.HEAP);
307      String key = null;
308      long offset = 0;
309      if (path != null) {
310        key = path.getName();
311        offset = i * blockSize;
312      } else {
313        /* No conflicting keys */
314        key = Long.toString(rand.nextLong());
315        while (!usedStrings.add(key)) {
316          key = Long.toString(rand.nextLong());
317        }
318      }
319      returnedBlocks[i] = new HFileBlockPair();
320      returnedBlocks[i].blockName = new BlockCacheKey(key, offset);
321      returnedBlocks[i].block = generated;
322    }
323    return returnedBlocks;
324  }
325
326  public static class HFileBlockPair {
327    BlockCacheKey blockName;
328    HFileBlock block;
329
330    public BlockCacheKey getBlockName() {
331      return this.blockName;
332    }
333
334    public HFileBlock getBlock() {
335      return this.block;
336    }
337  }
338
339  public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
340    Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) {
341    destBuffer.clear();
342    cache.cacheBlock(key, blockToCache);
343    Cacheable actualBlock = cache.getBlock(key, false, false, false);
344    try {
345      actualBlock.serialize(destBuffer, true);
346      assertEquals(expectedBuffer, destBuffer);
347    } finally {
348      // Release the reference count increased by getBlock.
349      if (actualBlock != null) {
350        actualBlock.release();
351      }
352    }
353  }
354}