001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021
022import com.thimbleware.jmemcached.CacheElement;
023import com.thimbleware.jmemcached.CacheImpl;
024import com.thimbleware.jmemcached.Key;
025import com.thimbleware.jmemcached.LocalCacheElement;
026import com.thimbleware.jmemcached.MemCacheDaemon;
027import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap;
028import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap.EvictionPolicy;
029import java.net.InetSocketAddress;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
036import org.apache.hadoop.hbase.testclassification.IOTests;
037import org.apache.hadoop.hbase.testclassification.SmallTests;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044
045@Category({ IOTests.class, SmallTests.class })
046public class TestMemcachedBlockCache {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestMemcachedBlockCache.class);
051
052  static MemCacheDaemon<? extends CacheElement> MEMCACHED;
053  static MemcachedBlockCache CACHE;
054
055  @Before
056  public void before() throws Exception {
057    MEMCACHED.getCache().flush_all();
058    assertEquals("Memcache is not empty", MEMCACHED.getCache().getCurrentItems(), 0);
059  }
060
061  @BeforeClass
062  public static void setup() throws Exception {
063    int port = HBaseTestingUtility.randomFreePort();
064    MEMCACHED = createDaemon(port);
065    Configuration conf = new Configuration();
066    conf.set("hbase.cache.memcached.servers", "localhost:" + port);
067    CACHE = new MemcachedBlockCache(conf);
068  }
069
070  @AfterClass
071  public static void tearDown() throws Exception {
072    if (MEMCACHED != null) {
073      MEMCACHED.stop();
074    }
075  }
076
077  @Test
078  public void testCache() throws Exception {
079    final int NUM_BLOCKS = 10;
080    HFileBlockPair[] blocks =
081      CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS);
082    for (int i = 0; i < NUM_BLOCKS; i++) {
083      CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
084    }
085    Waiter.waitFor(new Configuration(), 10000,
086      () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
087  }
088
089  @Test
090  public void testEviction() throws Exception {
091    final int NUM_BLOCKS = 10;
092    HFileBlockPair[] blocks =
093      CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS);
094    for (int i = 0; i < NUM_BLOCKS; i++) {
095      CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
096    }
097    Waiter.waitFor(new Configuration(), 10000,
098      () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS);
099    for (int i = 0; i < NUM_BLOCKS; i++) {
100      CACHE.evictBlock(blocks[i].getBlockName());
101    }
102    Waiter.waitFor(new Configuration(), 10000, () -> MEMCACHED.getCache().getCurrentItems() == 0);
103  }
104
105  private static MemCacheDaemon<? extends CacheElement> createDaemon(int port) {
106    InetSocketAddress addr = new InetSocketAddress("localhost", port);
107    MemCacheDaemon<LocalCacheElement> daemon = new MemCacheDaemon<LocalCacheElement>();
108    ConcurrentLinkedHashMap<Key, LocalCacheElement> cacheStorage =
109      ConcurrentLinkedHashMap.create(EvictionPolicy.LRU, 1000, 1024 * 1024);
110    daemon.setCache(new CacheImpl(cacheStorage));
111    daemon.setAddr(addr);
112    daemon.setVerbose(true);
113    daemon.start();
114    while (!daemon.isRunning()) {
115      try {
116        Thread.sleep(100);
117      } catch (InterruptedException e) {
118        Thread.currentThread().interrupt();
119      }
120    }
121    return daemon;
122  }
123
124}