001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021 022import com.thimbleware.jmemcached.CacheElement; 023import com.thimbleware.jmemcached.CacheImpl; 024import com.thimbleware.jmemcached.Key; 025import com.thimbleware.jmemcached.LocalCacheElement; 026import com.thimbleware.jmemcached.MemCacheDaemon; 027import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap; 028import com.thimbleware.jmemcached.storage.hash.ConcurrentLinkedHashMap.EvictionPolicy; 029import java.net.InetSocketAddress; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 036import org.apache.hadoop.hbase.testclassification.IOTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.junit.AfterClass; 039import org.junit.Before; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044 045@Category({ IOTests.class, SmallTests.class }) 046public class TestMemcachedBlockCache { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestMemcachedBlockCache.class); 051 052 static MemCacheDaemon<? extends CacheElement> MEMCACHED; 053 static MemcachedBlockCache CACHE; 054 055 @Before 056 public void before() throws Exception { 057 MEMCACHED.getCache().flush_all(); 058 assertEquals("Memcache is not empty", MEMCACHED.getCache().getCurrentItems(), 0); 059 } 060 061 @BeforeClass 062 public static void setup() throws Exception { 063 int port = HBaseTestingUtility.randomFreePort(); 064 MEMCACHED = createDaemon(port); 065 Configuration conf = new Configuration(); 066 conf.set("hbase.cache.memcached.servers", "localhost:" + port); 067 CACHE = new MemcachedBlockCache(conf); 068 } 069 070 @AfterClass 071 public static void tearDown() throws Exception { 072 if (MEMCACHED != null) { 073 MEMCACHED.stop(); 074 } 075 } 076 077 @Test 078 public void testCache() throws Exception { 079 final int NUM_BLOCKS = 10; 080 HFileBlockPair[] blocks = 081 CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS); 082 for (int i = 0; i < NUM_BLOCKS; i++) { 083 CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); 084 } 085 Waiter.waitFor(new Configuration(), 10000, 086 () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS); 087 } 088 089 @Test 090 public void testEviction() throws Exception { 091 final int NUM_BLOCKS = 10; 092 HFileBlockPair[] blocks = 093 CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, NUM_BLOCKS); 094 for (int i = 0; i < NUM_BLOCKS; i++) { 095 CACHE.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); 096 } 097 Waiter.waitFor(new Configuration(), 10000, 098 () -> MEMCACHED.getCache().getCurrentItems() == NUM_BLOCKS); 099 for (int i = 0; i < NUM_BLOCKS; i++) { 100 CACHE.evictBlock(blocks[i].getBlockName()); 101 } 102 Waiter.waitFor(new Configuration(), 10000, () -> MEMCACHED.getCache().getCurrentItems() == 0); 103 } 104 105 private static MemCacheDaemon<? extends CacheElement> createDaemon(int port) { 106 InetSocketAddress addr = new InetSocketAddress("localhost", port); 107 MemCacheDaemon<LocalCacheElement> daemon = new MemCacheDaemon<LocalCacheElement>(); 108 ConcurrentLinkedHashMap<Key, LocalCacheElement> cacheStorage = 109 ConcurrentLinkedHashMap.create(EvictionPolicy.LRU, 1000, 1024 * 1024); 110 daemon.setCache(new CacheImpl(cacheStorage)); 111 daemon.setAddr(addr); 112 daemon.setVerbose(true); 113 daemon.start(); 114 while (!daemon.isRunning()) { 115 try { 116 Thread.sleep(100); 117 } catch (InterruptedException e) { 118 Thread.currentThread().interrupt(); 119 } 120 } 121 return daemon; 122 } 123 124}