001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.hamcrest.CoreMatchers.is; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.nio.ByteBuffer; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.concurrent.BlockingQueue; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 033import org.apache.hadoop.hbase.io.hfile.Cacheable; 034import org.apache.hadoop.hbase.io.hfile.HFileBlock; 035import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; 036import org.apache.hadoop.hbase.testclassification.IOTests; 037import org.apache.hadoop.hbase.testclassification.SmallTests; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.mockito.Mockito; 044 045@Category({ IOTests.class, SmallTests.class }) 046public class TestBucketWriterThread { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestBucketWriterThread.class); 051 052 private BucketCache bc; 053 private BucketCache.WriterThread wt; 054 private BlockingQueue<RAMQueueEntry> q; 055 private Cacheable plainCacheable; 056 private BlockCacheKey plainKey; 057 058 /** A BucketCache that does not start its writer threads. */ 059 private static class MockBucketCache extends BucketCache { 060 061 public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, 062 int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) 063 throws IOException { 064 super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, 065 persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); 066 } 067 068 @Override 069 protected void startWriterThreads() { 070 // intentional noop 071 } 072 } 073 074 /** 075 * Set up variables and get BucketCache and WriterThread into state where tests can manually 076 * control the running of WriterThread and BucketCache is empty. 077 */ 078 @Before 079 public void setUp() throws Exception { 080 // Arbitrary capacity. 081 final int capacity = 16; 082 // Run with one writer thread only. Means there will be one writer queue only too. We depend 083 // on this in below. 084 final int writerThreadsCount = 1; 085 this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 }, writerThreadsCount, 086 capacity, null, 100/* Tolerate ioerrors for 100ms */); 087 this.bc.waitForCacheInitialization(10000); 088 assertEquals(writerThreadsCount, bc.writerThreads.length); 089 assertEquals(writerThreadsCount, bc.writerQueues.size()); 090 // Get reference to our single WriterThread instance. 091 this.wt = bc.writerThreads[0]; 092 this.q = bc.writerQueues.get(0); 093 094 wt.disableWriter(); 095 this.plainKey = new BlockCacheKey("f", 0); 096 this.plainCacheable = Mockito.mock(Cacheable.class); 097 098 assertThat(bc.ramCache.isEmpty(), is(true)); 099 assertTrue(q.isEmpty()); 100 } 101 102 @After 103 public void tearDown() throws Exception { 104 if (this.bc != null) this.bc.shutdown(); 105 } 106 107 /** 108 * Test non-error case just works. 109 */ 110 @Test 111 public void testNonErrorCase() throws IOException, InterruptedException { 112 bc.cacheBlock(this.plainKey, this.plainCacheable); 113 doDrainOfOneEntry(this.bc, this.wt, this.q); 114 } 115 116 /** 117 * Pass through a too big entry and ensure it is cleared from queues and ramCache. Manually run 118 * the WriterThread. 119 */ 120 @Test 121 public void testTooBigEntry() throws InterruptedException { 122 Cacheable tooBigCacheable = Mockito.mock(Cacheable.class); 123 Mockito.when(tooBigCacheable.getSerializedLength()).thenReturn(Integer.MAX_VALUE); 124 this.bc.cacheBlock(this.plainKey, tooBigCacheable); 125 doDrainOfOneEntry(this.bc, this.wt, this.q); 126 assertTrue(bc.blocksByHFile.isEmpty()); 127 assertTrue(bc.getBackingMap().isEmpty()); 128 } 129 130 /** 131 * Do IOE. Take the RAMQueueEntry that was on the queue, doctor it to throw exception, then put it 132 * back and process it. 133 */ 134 @SuppressWarnings("unchecked") 135 @Test 136 public void testIOE() throws IOException, InterruptedException { 137 this.bc.cacheBlock(this.plainKey, plainCacheable); 138 RAMQueueEntry rqe = q.remove(); 139 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 140 Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(), 141 Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 142 this.q.add(spiedRqe); 143 doDrainOfOneEntry(bc, wt, q); 144 assertTrue(bc.blocksByHFile.isEmpty()); 145 assertTrue(bc.getBackingMap().isEmpty()); 146 // Cache disabled when ioes w/o ever healing. 147 assertTrue(!bc.isCacheEnabled()); 148 } 149 150 /** 151 * Do Cache full exception 152 */ 153 @Test 154 public void testCacheFullException() throws IOException, InterruptedException { 155 this.bc.cacheBlock(this.plainKey, plainCacheable); 156 RAMQueueEntry rqe = q.remove(); 157 RAMQueueEntry spiedRqe = Mockito.spy(rqe); 158 final CacheFullException cfe = new CacheFullException(0, 0); 159 BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); 160 Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(), 161 Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); 162 this.q.add(spiedRqe); 163 doDrainOfOneEntry(bc, wt, q); 164 } 165 166 private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.WriterThread wt, 167 final BlockingQueue<RAMQueueEntry> q) throws InterruptedException { 168 List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); 169 bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); 170 assertTrue(q.isEmpty()); 171 assertTrue(bc.ramCache.isEmpty()); 172 assertEquals(0, bc.heapSize()); 173 } 174}