001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.mockito.Mockito.doReturn;
024import static org.mockito.Mockito.mock;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.client.RegionInfoBuilder;
031import org.apache.hadoop.hbase.testclassification.RegionServerTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.apache.hadoop.hbase.util.Threads;
034import org.junit.Before;
035import org.junit.ClassRule;
036import org.junit.Rule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039import org.junit.rules.TestName;
040
041@Category({ RegionServerTests.class, SmallTests.class })
042public class TestMemStoreFlusher {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestMemStoreFlusher.class);
047
048  @Rule
049  public TestName name = new TestName();
050
051  public MemStoreFlusher msf;
052
053  @Before
054  public void setUp() throws Exception {
055    Configuration conf = new Configuration();
056    conf.set("hbase.hstore.flusher.count", "0");
057    msf = new MemStoreFlusher(conf, null);
058  }
059
060  @Test
061  public void testReplaceDelayedFlushEntry() {
062    RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
063      .setRegionId(1).setReplicaId(0).build();
064    HRegion r = mock(HRegion.class);
065    doReturn(hri).when(r).getRegionInfo();
066
067    // put a delayed task with 30s delay
068    msf.requestDelayedFlush(r, 30000);
069    assertEquals(1, msf.getFlushQueueSize());
070    assertTrue(msf.regionsInQueue.get(r).isDelay());
071
072    // put a non-delayed task, then the delayed one should be replaced
073    assertTrue(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY));
074    assertEquals(1, msf.getFlushQueueSize());
075    assertFalse(msf.regionsInQueue.get(r).isDelay());
076  }
077
078  @Test
079  public void testNotReplaceDelayedFlushEntryWhichExpired() {
080    RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
081      .setRegionId(1).setReplicaId(0).build();
082    HRegion r = mock(HRegion.class);
083    doReturn(hri).when(r).getRegionInfo();
084
085    // put a delayed task with 100ms delay
086    msf.requestDelayedFlush(r, 100);
087    assertEquals(1, msf.getFlushQueueSize());
088    assertTrue(msf.regionsInQueue.get(r).isDelay());
089
090    Threads.sleep(200);
091
092    // put a non-delayed task, and the delayed one is expired, so it should not be replaced
093    assertFalse(msf.requestFlush(r, FlushLifeCycleTracker.DUMMY));
094    assertEquals(1, msf.getFlushQueueSize());
095    assertTrue(msf.regionsInQueue.get(r).isDelay());
096  }
097
098  @Test
099  public void testChangeFlusherCount() {
100    Configuration conf = new Configuration();
101    conf.set("hbase.hstore.flusher.count", "0");
102    HRegionServer rs = mock(HRegionServer.class);
103    doReturn(false).when(rs).isStopped();
104    doReturn(new RegionServerAccounting(conf)).when(rs).getRegionServerAccounting();
105
106    msf = new MemStoreFlusher(conf, rs);
107    msf.start(Threads.LOGGING_EXCEPTION_HANDLER);
108
109    Configuration newConf = new Configuration();
110
111    newConf.set("hbase.hstore.flusher.count", "3");
112    msf.onConfigurationChange(newConf);
113    assertEquals(3, msf.getFlusherCount());
114
115    newConf.set("hbase.hstore.flusher.count", "0");
116    msf.onConfigurationChange(newConf);
117    assertEquals(1, msf.getFlusherCount());
118  }
119}