001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.List; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.MiniHBaseCluster; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 038import org.apache.hadoop.hbase.master.LoadBalancer; 039import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.HStore; 043import org.apache.hadoop.hbase.regionserver.Region; 044import org.apache.hadoop.hbase.regionserver.StoreEngine; 045import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.JVMClusterUtil; 049import org.apache.hadoop.hbase.util.Pair; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category(LargeTests.class) 061public class TestFlushWithThroughputController { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestFlushWithThroughputController.class); 066 067 private static final Logger LOG = 068 LoggerFactory.getLogger(TestFlushWithThroughputController.class); 069 private static final double EPSILON = 1.3E-6; 070 071 private HBaseTestingUtility hbtu; 072 @Rule 073 public TestName testName = new TestName(); 074 private TableName tableName; 075 private final byte[] family = Bytes.toBytes("f"); 076 private final byte[] qualifier = Bytes.toBytes("q"); 077 078 @Before 079 public void setUp() { 080 hbtu = new HBaseTestingUtility(); 081 tableName = TableName.valueOf("Table-" + testName.getMethodName()); 082 hbtu.getConfiguration().set( 083 FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 084 PressureAwareFlushThroughputController.class.getName()); 085 } 086 087 @After 088 public void tearDown() throws Exception { 089 hbtu.shutdownMiniCluster(); 090 } 091 092 private HStore getStoreWithName(TableName tableName) { 093 MiniHBaseCluster cluster = hbtu.getMiniHBaseCluster(); 094 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 095 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 096 HRegionServer hrs = rsts.get(i).getRegionServer(); 097 for (Region region : hrs.getRegions(tableName)) { 098 return ((HRegion) region).getStores().iterator().next(); 099 } 100 } 101 return null; 102 } 103 104 private void setMaxMinThroughputs(long max, long min) { 105 Configuration conf = hbtu.getConfiguration(); 106 conf.setLong( 107 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, min); 108 conf.setLong( 109 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, max); 110 } 111 112 /** 113 * Writes Puts to the table and flushes few times. 114 * @return {@link Pair} of (throughput, duration). 115 */ 116 private Pair<Double, Long> generateAndFlushData(Table table) throws IOException { 117 // Internally, throughput is controlled after every cell write, so keep value size less for 118 // better control. 119 final int NUM_FLUSHES = 3, NUM_PUTS = 50, VALUE_SIZE = 200 * 1024; 120 long duration = 0; 121 for (int i = 0; i < NUM_FLUSHES; i++) { 122 // Write about 10M (10 times of throughput rate) per iteration. 123 for (int j = 0; j < NUM_PUTS; j++) { 124 byte[] value = new byte[VALUE_SIZE]; 125 Bytes.random(value); 126 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 127 } 128 long startTime = System.nanoTime(); 129 hbtu.getHBaseCluster().getRegions(tableName).stream().findFirst().ifPresent(r -> { 130 try { 131 r.flush(true); 132 } catch (IOException e) { 133 LOG.error("Failed flush region {}", r, e); 134 fail("Failed flush region " + r.getRegionInfo().getRegionNameAsString()); 135 } 136 }); 137 duration += System.nanoTime() - startTime; 138 } 139 HStore store = getStoreWithName(tableName); 140 assertEquals(NUM_FLUSHES, store.getStorefilesCount()); 141 double throughput = 142 (double) store.getStorefilesSize() / TimeUnit.NANOSECONDS.toSeconds(duration); 143 return new Pair<>(throughput, duration); 144 } 145 146 private long testFlushWithThroughputLimit() throws Exception { 147 final long throughputLimit = 1024 * 1024; 148 setMaxMinThroughputs(throughputLimit, throughputLimit); 149 Configuration conf = hbtu.getConfiguration(); 150 conf.setLong( 151 PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, 152 throughputLimit); 153 hbtu.startMiniCluster(1); 154 Table table = hbtu.createTable(tableName, family); 155 Pair<Double, Long> result = generateAndFlushData(table); 156 hbtu.deleteTable(tableName); 157 LOG.debug("Throughput is: " + (result.getFirst() / 1024 / 1024) + " MB/s"); 158 // confirm that the speed limit work properly(not too fast, and also not too slow) 159 // 20% is the max acceptable error rate. 160 assertTrue(result.getFirst() < throughputLimit * 1.2); 161 assertTrue(result.getFirst() > throughputLimit * 0.8); 162 return result.getSecond(); 163 } 164 165 @Test 166 public void testFlushControl() throws Exception { 167 testFlushWithThroughputLimit(); 168 } 169 170 /** 171 * Test the tuning task of {@link PressureAwareFlushThroughputController} 172 */ 173 @Test 174 public void testFlushThroughputTuning() throws Exception { 175 Configuration conf = hbtu.getConfiguration(); 176 setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024); 177 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 178 conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, 179 3000); 180 hbtu.startMiniCluster(1); 181 Connection conn = ConnectionFactory.createConnection(conf); 182 hbtu.getAdmin() 183 .createTable(TableDescriptorBuilder.newBuilder(tableName) 184 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 185 .build()); 186 hbtu.waitTableAvailable(tableName); 187 HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName); 188 double pressure = regionServer.getFlushPressure(); 189 LOG.debug("Flush pressure before flushing: " + pressure); 190 PressureAwareFlushThroughputController throughputController = 191 (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); 192 for (HRegion region : regionServer.getRegions()) { 193 region.flush(true); 194 } 195 // We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we 196 // changed to use heapSize instead of dataSize to calculate the flush pressure, and since 197 // heapSize will never be zero, so flush pressure will never be zero either. So we changed the 198 // assertion here. 199 assertTrue(regionServer.getFlushPressure() < pressure); 200 Thread.sleep(5000); 201 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration()); 202 if (tablesOnMaster) { 203 // If no tables on the master, this math is off and I'm not sure what it is supposed to be 204 // when meta is on the regionserver and not on the master. 205 assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 206 } 207 Table table = conn.getTable(tableName); 208 for (int i = 0; i < 10; i++) { 209 for (int j = 0; j < 10; j++) { 210 byte[] value = new byte[256 * 1024]; 211 Bytes.random(value); 212 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 213 } 214 } 215 Thread.sleep(5000); 216 double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); 217 assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); 218 219 conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, 220 NoLimitThroughputController.class.getName()); 221 regionServer.onConfigurationChange(conf); 222 assertTrue(throughputController.isStopped()); 223 assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); 224 conn.close(); 225 } 226 227 /** 228 * Test the logic for striped store. 229 */ 230 @Test 231 public void testFlushControlForStripedStore() throws Exception { 232 hbtu.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, 233 StripeStoreEngine.class.getName()); 234 testFlushWithThroughputLimit(); 235 } 236}