001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.ArgumentMatchers.anyInt; 025import static org.mockito.ArgumentMatchers.anyString; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.net.InetSocketAddress; 031import java.util.List; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ForkJoinPool; 036import java.util.concurrent.ThreadLocalRandom; 037import net.spy.memcached.CachedData; 038import net.spy.memcached.ConnectionFactory; 039import net.spy.memcached.FailureMode; 040import net.spy.memcached.MemcachedClient; 041import net.spy.memcached.internal.OperationFuture; 042import net.spy.memcached.ops.Operation; 043import net.spy.memcached.ops.OperationState; 044import net.spy.memcached.ops.OperationStatus; 045import net.spy.memcached.transcoders.Transcoder; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.HBaseClassTestRule; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.Waiter; 050import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 051import org.apache.hadoop.hbase.testclassification.IOTests; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.junit.Before; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057 058@Category({ IOTests.class, SmallTests.class }) 059public class TestMemcachedBlockCache { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestMemcachedBlockCache.class); 064 065 private MemcachedBlockCache cache; 066 067 private ConcurrentMap<String, CachedData> backingMap; 068 069 @Before 070 public void setup() throws Exception { 071 int port = ThreadLocalRandom.current().nextInt(1024, 65536); 072 Configuration conf = new Configuration(); 073 conf.set("hbase.cache.memcached.servers", "localhost:" + port); 074 backingMap = new ConcurrentHashMap<>(); 075 cache = new MemcachedBlockCache(conf) { 076 077 private <T> OperationFuture<T> createFuture(String key, long opTimeout, T result) { 078 OperationFuture<T> future = 079 new OperationFuture<>(key, new CountDownLatch(0), opTimeout, ForkJoinPool.commonPool()); 080 Operation op = mock(Operation.class); 081 when(op.getState()).thenReturn(OperationState.COMPLETE); 082 future.setOperation(op); 083 future.set(result, new OperationStatus(true, "")); 084 085 return future; 086 } 087 088 @Override 089 protected MemcachedClient createMemcachedClient(ConnectionFactory factory, 090 List<InetSocketAddress> serverAddresses) throws IOException { 091 assertEquals(FailureMode.Redistribute, factory.getFailureMode()); 092 assertTrue(factory.isDaemon()); 093 assertFalse(factory.useNagleAlgorithm()); 094 assertEquals(MAX_SIZE, factory.getReadBufSize()); 095 assertEquals(1, serverAddresses.size()); 096 assertEquals("localhost", serverAddresses.get(0).getHostName()); 097 assertEquals(port, serverAddresses.get(0).getPort()); 098 MemcachedClient client = mock(MemcachedClient.class); 099 when(client.set(anyString(), anyInt(), any(), any())).then(inv -> { 100 String key = inv.getArgument(0); 101 HFileBlock block = inv.getArgument(2); 102 Transcoder<HFileBlock> tc = inv.getArgument(3); 103 CachedData cd = tc.encode(block); 104 backingMap.put(key, cd); 105 return createFuture(key, factory.getOperationTimeout(), true); 106 }); 107 when(client.delete(anyString())).then(inv -> { 108 String key = inv.getArgument(0); 109 backingMap.remove(key); 110 return createFuture(key, factory.getOperationTimeout(), true); 111 }); 112 when(client.get(anyString(), any())).then(inv -> { 113 String key = inv.getArgument(0); 114 Transcoder<HFileBlock> tc = inv.getArgument(1); 115 CachedData cd = backingMap.get(key); 116 return tc.decode(cd); 117 }); 118 return client; 119 } 120 }; 121 } 122 123 @Test 124 public void testCache() throws Exception { 125 final int numBlocks = 10; 126 HFileBlockPair[] blocks = 127 CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks); 128 for (int i = 0; i < numBlocks; i++) { 129 cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); 130 } 131 Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks); 132 for (int i = 0; i < numBlocks; i++) { 133 HFileBlock actual = (HFileBlock) cache.getBlock(blocks[i].getBlockName(), false, false, true); 134 HFileBlock expected = blocks[i].getBlock(); 135 assertEquals(expected.getBlockType(), actual.getBlockType()); 136 assertEquals(expected.getSerializedLength(), actual.getSerializedLength()); 137 } 138 } 139 140 @Test 141 public void testEviction() throws Exception { 142 final int numBlocks = 10; 143 HFileBlockPair[] blocks = 144 CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks); 145 for (int i = 0; i < numBlocks; i++) { 146 cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); 147 } 148 Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks); 149 for (int i = 0; i < numBlocks; i++) { 150 cache.evictBlock(blocks[i].getBlockName()); 151 } 152 Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == 0); 153 } 154 155}