001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 039import org.apache.hadoop.hbase.regionserver.HRegion; 040import org.apache.hadoop.hbase.regionserver.HRegionServer; 041import org.apache.hadoop.hbase.regionserver.HStore; 042import org.apache.hadoop.hbase.regionserver.Region; 043import org.apache.hadoop.hbase.regionserver.StoreEngine; 044import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.apache.hadoop.hbase.util.JVMClusterUtil; 048import org.apache.hadoop.hbase.util.Pair; 049import org.junit.After; 050import org.junit.Before; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059@Category(LargeTests.class) 060public class TestFlushWithThroughputController { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestFlushWithThroughputController.class); 065 066 private static final Logger LOG = 067 LoggerFactory.getLogger(TestFlushWithThroughputController.class); 068 private static final double EPSILON = 1.3E-6; 069 070 private HBaseTestingUtil hbtu; 071 @Rule 072 public TestName testName = new TestName(); 073 private TableName tableName; 074 private final byte[] family = Bytes.toBytes("f"); 075 private final byte[] qualifier = Bytes.toBytes("q"); 076 077 @Before 078 public void setUp() { 079 hbtu = new HBaseTestingUtil(); 080 tableName = TableName.valueOf("Table-" + testName.getMethodName()); 081 hbtu.getConfiguration().set( 082 FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 083 PressureAwareFlushThroughputController.class.getName()); 084 } 085 086 @After 087 public void tearDown() throws Exception { 088 hbtu.shutdownMiniCluster(); 089 } 090 091 private HStore getStoreWithName(TableName tableName) { 092 SingleProcessHBaseCluster cluster = hbtu.getMiniHBaseCluster(); 093 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 094 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 095 HRegionServer hrs = rsts.get(i).getRegionServer(); 096 for (Region region : hrs.getRegions(tableName)) { 097 return ((HRegion) region).getStores().iterator().next(); 098 } 099 } 100 return null; 101 } 102 103 private void setMaxMinThroughputs(long max, long min) { 104 Configuration conf = hbtu.getConfiguration(); 105 conf.setLong( 106 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, min); 107 conf.setLong( 108 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, max); 109 } 110 111 /** 112 * Writes Puts to the table and flushes few times. 113 * @return {@link Pair} of (throughput, duration). 114 */ 115 private Pair<Double, Long> generateAndFlushData(Table table) throws IOException { 116 // Internally, throughput is controlled after every cell write, so keep value size less for 117 // better control. 118 final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; 119 long duration = 0; 120 for (int i = 0; i < NUM_FLUSHES; i++) { 121 // Write about 10M (10 times of throughput rate) per iteration. 122 for (int j = 0; j < NUM_PUTS; j++) { 123 byte[] value = new byte[VALUE_SIZE]; 124 Bytes.random(value); 125 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 126 } 127 long startTime = System.nanoTime(); 128 hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { 129 try { 130 r.flush(true); 131 } catch (IOException e) { 132 LOG.error("Failed flush region {}", r, e); 133 fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); 134 } 135 }); 136 duration += System.nanoTime() - startTime; 137 } 138 HStore store = getStoreWithName(tableName); 139 assertEquals(NUM_FLUSHES, store.getStorefilesCount()); 140 double throughput = 141 (double) store.getStorefilesSize() / TimeUnit.NANOSECONDS.toSeconds(duration); 142 return new Pair<>(throughput, duration); 143 } 144 145 private long testFlushWithThroughputLimit() throws Exception { 146 final long throughputLimit = 1024 * 1024; 147 setMaxMinThroughputs(throughputLimit, throughputLimit); 148 Configuration conf = hbtu.getConfiguration(); 149 conf.setLong( 150 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, 151 throughputLimit); 152 hbtu.startMiniCluster(1); 153 Table table = hbtu.createTable(tableName, family); 154 Pair<Double, Long> result = generateAndFlushData(table); 155 hbtu.deleteTable(tableName); 156 LOG.debug("Throughput is: " + (result.getFirst() / 1024 / 1024) + " MB/s"); 157 // confirm that the speed limit work properly(not too fast, and also not too slow) 158 // 20% is the max acceptable error rate. 159 assertTrue(result.getFirst() < throughputLimit * 1.2); 160 assertTrue(result.getFirst() > throughputLimit * 0.8); 161 return result.getSecond(); 162 } 163 164 @Test 165 public void testFlushControl() throws Exception { 166 testFlushWithThroughputLimit(); 167 } 168 169 /** 170 * Test the tuning task of {@link PressureAwareFlushThroughputController} 171 */ 172 @Test 173 public void testFlushThroughputTuning() throws Exception { 174 Configuration conf = hbtu.getConfiguration(); 175 setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); 176 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 177 conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 178 3000); 179 hbtu.startMiniCluster(1); 180 Connection conn = ConnectionFactory.createConnection(conf); 181 hbtu.getAdmin() 182 .createTable(TableDescriptorBuilder.newBuilder(tableName) 183 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 184 .build()); 185 hbtu.waitTableAvailable(tableName); 186 HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); 187 double pressure = regionServer.getFlushPressure(); 188 LOG.debug("Flush pressure before flushing: " + pressure); 189 PressureAwareFlushThroughputController throughputController = 190 (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); 191 for (HRegion region : regionServer.getRegions()) { 192 region.flush(true); 193 } 194 // We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we 195 // changed to use heapSize instead of dataSize to calculate the flush pressure, and since 196 // heapSize will never be zero, so flush pressure will never be zero either. So we changed the 197 // assertion here. 198 assertTrue(regionServer.getFlushPressure() < pressure); 199 Thread.sleep(5000); 200 Table table = conn.getTable(tableName); 201 for (int i = 0; i < 10; i++) { 202 for (int j = 0; j < 10; j++) { 203 byte[] value = new byte[256 * 1024]; 204 Bytes.random(value); 205 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 206 } 207 } 208 Thread.sleep(5000); 209 double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); 210 assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); 211 212 conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 213 NoLimitThroughputController.class.getName()); 214 regionServer.onConfigurationChange(conf); 215 assertTrue(throughputController.isStopped()); 216 assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); 217 conn.close(); 218 } 219 220 /** 221 * Test the logic for striped store. 222 */ 223 @Test 224 public void testFlushControlForStripedStore() throws Exception { 225 hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, 226 StripeStoreEngine.class.getName()); 227 testFlushWithThroughputLimit(); 228 } 229}