001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.hamcrest.CoreMatchers.is;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.concurrent.BlockingQueue;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
033import org.apache.hadoop.hbase.io.hfile.Cacheable;
034import org.apache.hadoop.hbase.io.hfile.HFileBlock;
035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
036import org.apache.hadoop.hbase.testclassification.IOTests;
037import org.apache.hadoop.hbase.testclassification.SmallTests;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044
045@Category({ IOTests.class, SmallTests.class })
046public class TestBucketWriterThread {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestBucketWriterThread.class);
051
052  private BucketCache bc;
053  private BucketCache.WriterThread wt;
054  private BlockingQueue<RAMQueueEntry> q;
055  private Cacheable plainCacheable;
056  private BlockCacheKey plainKey;
057
058  /** A BucketCache that does not start its writer threads. */
059  private static class MockBucketCache extends BucketCache {
060
061    public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
062      int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
063      throws IOException {
064      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
065        persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
066    }
067
068    @Override
069    protected void startWriterThreads() {
070      // intentional noop
071    }
072  }
073
074  /**
075   * Set up variables and get BucketCache and WriterThread into state where tests can manually
076   * control the running of WriterThread and BucketCache is empty.
077   */
078  @Before
079  public void setUp() throws Exception {
080    // Arbitrary capacity.
081    final int capacity = 16;
082    // Run with one writer thread only. Means there will be one writer queue only too. We depend
083    // on this in below.
084    final int writerThreadsCount = 1;
085    this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount,
086      capacity, null, 100/* Tolerate ioerrors for 100ms */);
087    this.bc.waitForCacheInitialization(10000);
088    assertEquals(writerThreadsCount, bc.writerThreads.length);
089    assertEquals(writerThreadsCount, bc.writerQueues.size());
090    // Get reference to our single WriterThread instance.
091    this.wt = bc.writerThreads[0];
092    this.q = bc.writerQueues.get(0);
093
094    wt.disableWriter();
095    this.plainKey = new BlockCacheKey("f", 0);
096    this.plainCacheable = Mockito.mock(Cacheable.class);
097
098    assertThat(bc.ramCache.isEmpty(), is(true));
099    assertTrue(q.isEmpty());
100  }
101
102  @After
103  public void tearDown() throws Exception {
104    if (this.bc != null) this.bc.shutdown();
105  }
106
107  /**
108   * Test non-error case just works.
109   */
110  @Test
111  public void testNonErrorCase() throws IOException, InterruptedException {
112    bc.cacheBlock(this.plainKey, this.plainCacheable);
113    doDrainOfOneEntry(this.bc, this.wt, this.q);
114  }
115
116  /**
117   * Pass through a too big entry and ensure it is cleared from queues and ramCache. Manually run
118   * the WriterThread.
119   */
120  @Test
121  public void testTooBigEntry() throws InterruptedException {
122    Cacheable tooBigCacheable = Mockito.mock(Cacheable.class);
123    Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE);
124    this.bc.cacheBlock(this.plainKey, tooBigCacheable);
125    doDrainOfOneEntry(this.bc, this.wt, this.q);
126    assertTrue(bc.blocksByHFile.isEmpty());
127    assertTrue(bc.getBackingMap().isEmpty());
128  }
129
130  /**
131   * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then put it
132   * back and process it.
133   */
134  @SuppressWarnings("unchecked")
135  @Test
136  public void testIOE() throws IOException, InterruptedException {
137    this.bc.cacheBlock(this.plainKey, plainCacheable);
138    RAMQueueEntry rqe = q.remove();
139    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
140    Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(),
141      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
142    this.q.add(spiedRqe);
143    doDrainOfOneEntry(bc, wt, q);
144    assertTrue(bc.blocksByHFile.isEmpty());
145    assertTrue(bc.getBackingMap().isEmpty());
146    // Cache disabled when ioes w/o ever healing.
147    assertTrue(!bc.isCacheEnabled());
148  }
149
150  /**
151   * Do Cache full exception
152   */
153  @Test
154  public void testCacheFullException() throws IOException, InterruptedException {
155    this.bc.cacheBlock(this.plainKey, plainCacheable);
156    RAMQueueEntry rqe = q.remove();
157    RAMQueueEntry spiedRqe = Mockito.spy(rqe);
158    final CacheFullException cfe = new CacheFullException(0, 0);
159    BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
160    Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(),
161      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
162    this.q.add(spiedRqe);
163    doDrainOfOneEntry(bc, wt, q);
164  }
165
166  private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt,
167    final BlockingQueue<RAMQueueEntry> q) throws InterruptedException {
168    List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
169    bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
170    assertTrue(q.isEmpty());
171    assertTrue(bc.ramCache.isEmpty());
172    assertEquals(0, bc.heapSize());
173  }
174}