001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.nio.ByteBuffer;
029import java.util.Arrays;
030import java.util.HashSet;
031import java.util.Random;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicInteger;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.MultithreadedTestUtil;
039import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
040import org.apache.hadoop.hbase.io.ByteBuffAllocator;
041import org.apache.hadoop.hbase.io.HeapSize;
042import org.apache.hadoop.hbase.io.compress.Compression;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.ChecksumType;
048
049public class CacheTestUtils {
050
051  private static final boolean includesMemstoreTS = true;
052
053  /**
054   * Just checks if heapsize grows when something is cached, and gets smaller when the same object
055   * is evicted
056   */
057
058  public static void testHeapSizeChanges(final BlockCache toBeTested, final int blockSize) {
059    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, 1);
060    long heapSize = ((HeapSize) toBeTested).heapSize();
061    toBeTested.cacheBlock(blocks[0].blockName, blocks[0].block);
062
063    /* When we cache something HeapSize should always increase */
064    assertTrue(heapSize < ((HeapSize) toBeTested).heapSize());
065
066    toBeTested.evictBlock(blocks[0].blockName);
067
068    /* Post eviction, heapsize should be the same */
069    assertEquals(heapSize, ((HeapSize) toBeTested).heapSize());
070  }
071
072  public static void testCacheMultiThreaded(final BlockCache toBeTested, final int blockSize,
073    final int numThreads, final int numQueries, final double passingScore) throws Exception {
074
075    Configuration conf = new Configuration();
076    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
077
078    final AtomicInteger totalQueries = new AtomicInteger();
079    final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<>();
080    final AtomicInteger hits = new AtomicInteger();
081    final AtomicInteger miss = new AtomicInteger();
082
083    HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
084    blocksToTest.addAll(Arrays.asList(blocks));
085
086    for (int i = 0; i < numThreads; i++) {
087      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
088        @Override
089        public void doAnAction() throws Exception {
090          if (!blocksToTest.isEmpty()) {
091            HFileBlockPair ourBlock = blocksToTest.poll();
092            // if we run out of blocks to test, then we should stop the tests.
093            if (ourBlock == null) {
094              ctx.setStopFlag(true);
095              return;
096            }
097            toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
098            Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName, false, false, true);
099            if (retrievedBlock != null) {
100              assertEquals(ourBlock.block, retrievedBlock);
101              toBeTested.evictBlock(ourBlock.blockName);
102              hits.incrementAndGet();
103              assertNull(toBeTested.getBlock(ourBlock.blockName, false, false, true));
104            } else {
105              miss.incrementAndGet();
106            }
107            totalQueries.incrementAndGet();
108          }
109        }
110      };
111      t.setDaemon(true);
112      ctx.addThread(t);
113    }
114    ctx.startThreads();
115    while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
116      Thread.sleep(10);
117    }
118    ctx.stop();
119    if (hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore) {
120      fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get());
121    }
122  }
123
124  public static void testCacheSimple(BlockCache toBeTested, int blockSize, int numBlocks)
125    throws Exception {
126
127    HFileBlockPair[] blocks = generateHFileBlocks(blockSize, numBlocks);
128    // Confirm empty
129    for (HFileBlockPair block : blocks) {
130      assertNull(toBeTested.getBlock(block.blockName, true, false, true));
131    }
132
133    // Add blocks
134    for (HFileBlockPair block : blocks) {
135      toBeTested.cacheBlock(block.blockName, block.block);
136    }
137
138    // Check if all blocks are properly cached and contain the right
139    // information, or the blocks are null.
140    // MapMaker makes no guarantees when it will evict, so neither can we.
141
142    for (HFileBlockPair block : blocks) {
143      HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true, false, true);
144      if (buf != null) {
145        assertEquals(block.block, buf);
146      }
147    }
148
149    // Re-add some duplicate blocks. Hope nothing breaks.
150
151    for (HFileBlockPair block : blocks) {
152      try {
153        if (toBeTested.getBlock(block.blockName, true, false, true) != null) {
154          toBeTested.cacheBlock(block.blockName, block.block);
155          if (!(toBeTested instanceof BucketCache)) {
156            // BucketCache won't throw exception when caching already cached
157            // block
158            fail("Cache should not allow re-caching a block");
159          }
160        }
161      } catch (RuntimeException re) {
162        // expected
163      }
164    }
165
166    testConvertToJSON(toBeTested);
167
168  }
169
170  public static void hammerSingleKey(final BlockCache toBeTested, int numThreads, int numQueries)
171    throws Exception {
172    final BlockCacheKey key = new BlockCacheKey("key", 0);
173    final byte[] buf = new byte[5 * 1024];
174    Arrays.fill(buf, (byte) 5);
175
176    final ByteArrayCacheable bac = new ByteArrayCacheable(buf);
177    Configuration conf = new Configuration();
178    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
179
180    final AtomicInteger totalQueries = new AtomicInteger();
181    toBeTested.cacheBlock(key, bac);
182
183    for (int i = 0; i < numThreads; i++) {
184      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
185        @Override
186        public void doAnAction() throws Exception {
187          ByteArrayCacheable returned =
188            (ByteArrayCacheable) toBeTested.getBlock(key, false, false, true);
189          if (returned != null) {
190            assertArrayEquals(buf, returned.buf);
191          } else {
192            Thread.sleep(10);
193          }
194          totalQueries.incrementAndGet();
195        }
196      };
197
198      t.setDaemon(true);
199      ctx.addThread(t);
200    }
201
202    // add a thread to periodically evict and re-cache the block
203    final long blockEvictPeriod = 50;
204    TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
205      @Override
206      public void doAnAction() throws Exception {
207        toBeTested.evictBlock(key);
208        toBeTested.cacheBlock(key, bac);
209        Thread.sleep(blockEvictPeriod);
210      }
211    };
212    t.setDaemon(true);
213    ctx.addThread(t);
214
215    ctx.startThreads();
216    while (totalQueries.get() < numQueries && ctx.shouldRun()) {
217      Thread.sleep(10);
218    }
219    ctx.stop();
220  }
221
222  public static class ByteArrayCacheable implements Cacheable {
223
224    static final CacheableDeserializer<Cacheable> blockDeserializer =
225      new CacheableDeserializer<Cacheable>() {
226        @Override
227        public int getDeserializerIdentifier() {
228          return deserializerIdentifier;
229        }
230
231        @Override
232        public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
233          int len = b.getInt();
234          Thread.yield();
235          byte buf[] = new byte[len];
236          b.get(buf);
237          return new ByteArrayCacheable(buf);
238        }
239      };
240
241    final byte[] buf;
242
243    public ByteArrayCacheable(byte[] buf) {
244      this.buf = buf;
245    }
246
247    @Override
248    public long heapSize() {
249      return 4L + buf.length;
250    }
251
252    @Override
253    public int getSerializedLength() {
254      return 4 + buf.length;
255    }
256
257    @Override
258    public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
259      destination.putInt(buf.length);
260      Thread.yield();
261      destination.put(buf);
262      destination.rewind();
263    }
264
265    @Override
266    public CacheableDeserializer<Cacheable> getDeserializer() {
267      return blockDeserializer;
268    }
269
270    private static final int deserializerIdentifier;
271    static {
272      deserializerIdentifier =
273        CacheableDeserializerIdManager.registerDeserializer(blockDeserializer);
274    }
275
276    @Override
277    public BlockType getBlockType() {
278      return BlockType.DATA;
279    }
280  }
281
282  public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) {
283    return generateBlocksForPath(blockSize, numBlocks, null);
284  }
285
286  public static HFileBlockPair[] generateBlocksForPath(int blockSize, int numBlocks, Path path,
287    boolean encoded) {
288    HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
289    Random rand = ThreadLocalRandom.current();
290    HashSet<String> usedStrings = new HashSet<>();
291    for (int i = 0; i < numBlocks; i++) {
292      ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize);
293      Bytes.random(cachedBuffer.array());
294      cachedBuffer.rewind();
295      int onDiskSizeWithoutHeader = blockSize;
296      int uncompressedSizeWithoutHeader = blockSize;
297      long prevBlockOffset = rand.nextLong();
298      BlockType.DATA.write(cachedBuffer);
299      cachedBuffer.putInt(onDiskSizeWithoutHeader);
300      cachedBuffer.putInt(uncompressedSizeWithoutHeader);
301      cachedBuffer.putLong(prevBlockOffset);
302      cachedBuffer.rewind();
303      HFileContext meta =
304        new HFileContextBuilder().withHBaseCheckSum(false).withIncludesMvcc(includesMemstoreTS)
305          .withIncludesTags(false).withCompression(Compression.Algorithm.NONE)
306          .withDataBlockEncoding(DataBlockEncoding.FAST_DIFF).withBytesPerCheckSum(0)
307          .withChecksumType(ChecksumType.NULL).build();
308      HFileBlock generated = new HFileBlock(encoded ? BlockType.ENCODED_DATA : BlockType.DATA,
309        onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset,
310        ByteBuff.wrap(cachedBuffer), HFileBlock.DONT_FILL_HEADER, blockSize,
311        onDiskSizeWithoutHeader + HConstants.HFILEBLOCK_HEADER_SIZE, -1, meta,
312        ByteBuffAllocator.HEAP);
313      String key = null;
314      long offset = 0;
315      if (path != null) {
316        key = path.getName();
317        offset = i * blockSize;
318      } else {
319        /* No conflicting keys */
320        key = Long.toString(rand.nextLong());
321        while (!usedStrings.add(key)) {
322          key = Long.toString(rand.nextLong());
323        }
324      }
325      returnedBlocks[i] = new HFileBlockPair();
326      returnedBlocks[i].blockName =
327        new BlockCacheKey(key, offset, true, encoded ? BlockType.ENCODED_DATA : BlockType.DATA);
328      returnedBlocks[i].block = generated;
329    }
330    return returnedBlocks;
331  }
332
333  public static HFileBlockPair[] generateBlocksForPath(int blockSize, int numBlocks, Path path) {
334    return generateBlocksForPath(blockSize, numBlocks, path, false);
335  }
336
337  public static class HFileBlockPair {
338    BlockCacheKey blockName;
339    HFileBlock block;
340
341    public BlockCacheKey getBlockName() {
342      return this.blockName;
343    }
344
345    public HFileBlock getBlock() {
346      return this.block;
347    }
348  }
349
350  public static void getBlockAndAssertEquals(BlockCache cache, BlockCacheKey key,
351    Cacheable blockToCache, ByteBuffer destBuffer, ByteBuffer expectedBuffer) {
352    destBuffer.clear();
353    cache.cacheBlock(key, blockToCache);
354    Cacheable actualBlock = cache.getBlock(key, false, false, false);
355    try {
356      actualBlock.serialize(destBuffer, true);
357      assertEquals(expectedBuffer, destBuffer);
358    } finally {
359      // Release the reference count increased by getBlock.
360      if (actualBlock != null) {
361        actualBlock.release();
362      }
363    }
364  }
365
366  public static void testConvertToJSON(BlockCache toBeTested) {
367    try {
368      String json = BlockCacheUtil.toJSON(toBeTested);
369      assertNotNull(json);
370    } catch (Exception e) {
371      fail("conversion to JSON should not fail, exception is:" + e);
372    }
373  }
374}