001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.ArgumentMatchers.anyInt;
025import static org.mockito.ArgumentMatchers.anyString;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.net.InetSocketAddress;
031import java.util.List;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ForkJoinPool;
036import java.util.concurrent.ThreadLocalRandom;
037import net.spy.memcached.CachedData;
038import net.spy.memcached.ConnectionFactory;
039import net.spy.memcached.FailureMode;
040import net.spy.memcached.MemcachedClient;
041import net.spy.memcached.internal.OperationFuture;
042import net.spy.memcached.ops.Operation;
043import net.spy.memcached.ops.OperationState;
044import net.spy.memcached.ops.OperationStatus;
045import net.spy.memcached.transcoders.Transcoder;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.HBaseClassTestRule;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.Waiter;
050import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
051import org.apache.hadoop.hbase.testclassification.IOTests;
052import org.apache.hadoop.hbase.testclassification.SmallTests;
053import org.junit.Before;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057
058@Category({ IOTests.class, SmallTests.class })
059public class TestMemcachedBlockCache {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestMemcachedBlockCache.class);
064
065  private MemcachedBlockCache cache;
066
067  private ConcurrentMap<String, CachedData> backingMap;
068
069  @Before
070  public void setup() throws Exception {
071    int port = ThreadLocalRandom.current().nextInt(1024, 65536);
072    Configuration conf = new Configuration();
073    conf.set("hbase.cache.memcached.servers", "localhost:" + port);
074    backingMap = new ConcurrentHashMap<>();
075    cache = new MemcachedBlockCache(conf) {
076
077      private <T> OperationFuture<T> createFuture(String key, long opTimeout, T result) {
078        OperationFuture<T> future =
079          new OperationFuture<>(key, new CountDownLatch(0), opTimeout, ForkJoinPool.commonPool());
080        Operation op = mock(Operation.class);
081        when(op.getState()).thenReturn(OperationState.COMPLETE);
082        future.setOperation(op);
083        future.set(result, new OperationStatus(true, ""));
084
085        return future;
086      }
087
088      @Override
089      protected MemcachedClient createMemcachedClient(ConnectionFactory factory,
090        List<InetSocketAddress> serverAddresses) throws IOException {
091        assertEquals(FailureMode.Redistribute, factory.getFailureMode());
092        assertTrue(factory.isDaemon());
093        assertFalse(factory.useNagleAlgorithm());
094        assertEquals(MAX_SIZE, factory.getReadBufSize());
095        assertEquals(1, serverAddresses.size());
096        assertEquals("localhost", serverAddresses.get(0).getHostName());
097        assertEquals(port, serverAddresses.get(0).getPort());
098        MemcachedClient client = mock(MemcachedClient.class);
099        when(client.set(anyString(), anyInt(), any(), any())).then(inv -> {
100          String key = inv.getArgument(0);
101          HFileBlock block = inv.getArgument(2);
102          Transcoder<HFileBlock> tc = inv.getArgument(3);
103          CachedData cd = tc.encode(block);
104          backingMap.put(key, cd);
105          return createFuture(key, factory.getOperationTimeout(), true);
106        });
107        when(client.delete(anyString())).then(inv -> {
108          String key = inv.getArgument(0);
109          backingMap.remove(key);
110          return createFuture(key, factory.getOperationTimeout(), true);
111        });
112        when(client.get(anyString(), any())).then(inv -> {
113          String key = inv.getArgument(0);
114          Transcoder<HFileBlock> tc = inv.getArgument(1);
115          CachedData cd = backingMap.get(key);
116          return tc.decode(cd);
117        });
118        return client;
119      }
120    };
121  }
122
123  @Test
124  public void testCache() throws Exception {
125    final int numBlocks = 10;
126    HFileBlockPair[] blocks =
127      CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks);
128    for (int i = 0; i < numBlocks; i++) {
129      cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
130    }
131    Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks);
132    for (int i = 0; i < numBlocks; i++) {
133      HFileBlock actual = (HFileBlock) cache.getBlock(blocks[i].getBlockName(), false, false, true);
134      HFileBlock expected = blocks[i].getBlock();
135      assertEquals(expected.getBlockType(), actual.getBlockType());
136      assertEquals(expected.getSerializedLength(), actual.getSerializedLength());
137    }
138  }
139
140  @Test
141  public void testEviction() throws Exception {
142    final int numBlocks = 10;
143    HFileBlockPair[] blocks =
144      CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks);
145    for (int i = 0; i < numBlocks; i++) {
146      cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
147    }
148    Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks);
149    for (int i = 0; i < numBlocks; i++) {
150      cache.evictBlock(blocks[i].getBlockName());
151    }
152    Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == 0);
153  }
154
155}