001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import static org.mockito.Mockito.doReturn; 024import static org.mockito.Mockito.mock; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.client.RegionInfoBuilder; 031import org.apache.hadoop.hbase.testclassification.RegionServerTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.apache.hadoop.hbase.util.Threads; 034import org.junit.Before; 035import org.junit.ClassRule; 036import org.junit.Rule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039import org.junit.rules.TestName; 040 041@Category({ RegionServerTests.class, SmallTests.class }) 042public class TestMemStoreFlusher { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestMemStoreFlusher.class); 047 048 @Rule 049 public TestName name = new TestName(); 050 051 public MemStoreFlusher msf; 052 053 @Before 054 public void setUp() throws Exception { 055 Configuration conf = new Configuration(); 056 conf.set("hbase.hstore.flusher.count", "0"); 057 msf = new MemStoreFlusher(conf, null); 058 } 059 060 @Test 061 public void testReplaceDelayedFlushEntry() { 062 RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 063 .setRegionId(1).setReplicaId(0).build(); 064 HRegion r = mock(HRegion.class); 065 doReturn(hri).when(r).getRegionInfo(); 066 067 // put a delayed task with 30s delay 068 msf.requestDelayedFlush(r, 30000); 069 assertEquals(1, msf.getFlushQueueSize()); 070 assertTrue(msf.regionsInQueue.get(r).isDelay()); 071 072 // put a non-delayed task, then the delayed one should be replaced 073 assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); 074 assertEquals(1, msf.getFlushQueueSize()); 075 assertFalse(msf.regionsInQueue.get(r).isDelay()); 076 } 077 078 @Test 079 public void testNotReplaceDelayedFlushEntryWhichExpired() { 080 RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())) 081 .setRegionId(1).setReplicaId(0).build(); 082 HRegion r = mock(HRegion.class); 083 doReturn(hri).when(r).getRegionInfo(); 084 085 // put a delayed task with 100ms delay 086 msf.requestDelayedFlush(r, 100); 087 assertEquals(1, msf.getFlushQueueSize()); 088 assertTrue(msf.regionsInQueue.get(r).isDelay()); 089 090 Threads.sleep(200); 091 092 // put a non-delayed task, and the delayed one is expired, so it should not be replaced 093 assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY)); 094 assertEquals(1, msf.getFlushQueueSize()); 095 assertTrue(msf.regionsInQueue.get(r).isDelay()); 096 } 097 098 @Test 099 public void testChangeFlusherCount() { 100 Configuration conf = new Configuration(); 101 conf.set("hbase.hstore.flusher.count", "0"); 102 HRegionServer rs = mock(HRegionServer.class); 103 doReturn(false).when(rs).isStopped(); 104 doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting(); 105 106 msf = new MemStoreFlusher(conf, rs); 107 msf.start(Threads.LOGGING_EXCEPTION_HANDLER); 108 109 Configuration newConf = new Configuration(); 110 111 newConf.set("hbase.hstore.flusher.count", "3"); 112 msf.onConfigurationChange(newConf); 113 assertEquals(3, msf.getFlusherCount()); 114 115 newConf.set("hbase.hstore.flusher.count", "0"); 116 msf.onConfigurationChange(newConf); 117 assertEquals(1, msf.getFlusherCount()); 118 } 119}