001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND; 021import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND; 022import static org.apache.hadoop.hbase.regionserver.throttle.PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertTrue; 025 026import java.io.IOException; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; 041import org.apache.hadoop.hbase.regionserver.HRegion; 042import org.apache.hadoop.hbase.regionserver.HRegionServer; 043import org.apache.hadoop.hbase.regionserver.HStore; 044import org.apache.hadoop.hbase.regionserver.Region; 045import org.apache.hadoop.hbase.regionserver.StoreEngine; 046import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 047import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 048import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.testclassification.RegionServerTests; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060@Category({ RegionServerTests.class, LargeTests.class }) 061public class TestCompactionWithThroughputController { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestCompactionWithThroughputController.class); 066 067 private static final Logger LOG = 068 LoggerFactory.getLogger(TestCompactionWithThroughputController.class); 069 070 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 071 072 private static final double EPSILON = 1E-6; 073 074 private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); 075 076 private final byte[] family = Bytes.toBytes("f"); 077 078 private final byte[] qualifier = Bytes.toBytes("q"); 079 080 private HStore getStoreWithName(TableName tableName) { 081 SingleProcessHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); 082 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads(); 083 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { 084 HRegionServer hrs = rsts.get(i).getRegionServer(); 085 for (Region region : hrs.getRegions(tableName)) { 086 return ((HRegion) region).getStores().iterator().next(); 087 } 088 } 089 return null; 090 } 091 092 private HStore prepareData() throws IOException { 093 Admin admin = TEST_UTIL.getAdmin(); 094 if (admin.tableExists(tableName)) { 095 admin.disableTable(tableName); 096 admin.deleteTable(tableName); 097 } 098 Table table = TEST_UTIL.createTable(tableName, family); 099 for (int i = 0; i < 10; i++) { 100 for (int j = 0; j < 10; j++) { 101 byte[] value = new byte[128 * 1024]; 102 Bytes.random(value); 103 table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); 104 } 105 admin.flush(tableName); 106 } 107 return getStoreWithName(tableName); 108 } 109 110 private long testCompactionWithThroughputLimit() throws Exception { 111 long throughputLimit = 1024L * 1024; 112 Configuration conf = TEST_UTIL.getConfiguration(); 113 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 114 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 115 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); 116 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 117 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, throughputLimit); 118 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, throughputLimit); 119 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 120 PressureAwareCompactionThroughputController.class.getName()); 121 TEST_UTIL.startMiniCluster(1); 122 try { 123 HStore store = prepareData(); 124 assertEquals(10, store.getStorefilesCount()); 125 long startTime = EnvironmentEdgeManager.currentTime(); 126 TEST_UTIL.getAdmin().majorCompact(tableName); 127 while (store.getStorefilesCount() != 1) { 128 Thread.sleep(20); 129 } 130 long duration = EnvironmentEdgeManager.currentTime() - startTime; 131 double throughput = (double) store.getStorefilesSize() / duration * 1000; 132 // confirm that the speed limit work properly(not too fast, and also not too slow) 133 // 20% is the max acceptable error rate. 134 assertTrue(throughput < throughputLimit * 1.2); 135 assertTrue(throughput > throughputLimit * 0.8); 136 return EnvironmentEdgeManager.currentTime() - startTime; 137 } finally { 138 TEST_UTIL.shutdownMiniCluster(); 139 } 140 } 141 142 private long testCompactionWithoutThroughputLimit() throws Exception { 143 Configuration conf = TEST_UTIL.getConfiguration(); 144 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 145 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100); 146 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200); 147 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); 148 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 149 NoLimitThroughputController.class.getName()); 150 TEST_UTIL.startMiniCluster(1); 151 try { 152 HStore store = prepareData(); 153 assertEquals(10, store.getStorefilesCount()); 154 long startTime = EnvironmentEdgeManager.currentTime(); 155 TEST_UTIL.getAdmin().majorCompact(tableName); 156 while (store.getStorefilesCount() != 1) { 157 Thread.sleep(20); 158 } 159 return EnvironmentEdgeManager.currentTime() - startTime; 160 } finally { 161 TEST_UTIL.shutdownMiniCluster(); 162 } 163 } 164 165 @Test 166 public void testCompaction() throws Exception { 167 long limitTime = testCompactionWithThroughputLimit(); 168 long noLimitTime = testCompactionWithoutThroughputLimit(); 169 LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use " 170 + noLimitTime + "ms"); 171 // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this 172 // is a very weak assumption. 173 assertTrue(limitTime > noLimitTime * 2); 174 } 175 176 /** 177 * Test the tuning task of {@link PressureAwareCompactionThroughputController} 178 */ 179 @Test 180 public void testThroughputTuning() throws Exception { 181 Configuration conf = TEST_UTIL.getConfiguration(); 182 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); 183 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND, 20L * 1024 * 1024); 184 conf.setLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND, 10L * 1024 * 1024); 185 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4); 186 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6); 187 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 188 PressureAwareCompactionThroughputController.class.getName()); 189 conf.setInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD, 1000); 190 TEST_UTIL.startMiniCluster(1); 191 Connection conn = ConnectionFactory.createConnection(conf); 192 try { 193 TEST_UTIL.getAdmin() 194 .createTable(TableDescriptorBuilder.newBuilder(tableName) 195 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 196 .build()); 197 TEST_UTIL.waitTableAvailable(tableName); 198 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); 199 PressureAwareCompactionThroughputController throughputController = 200 (PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread() 201 .getCompactionThroughputController(); 202 assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 203 Table table = conn.getTable(tableName); 204 for (int i = 0; i < 5; i++) { 205 byte[] value = new byte[0]; 206 table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value)); 207 TEST_UTIL.flush(tableName); 208 } 209 Thread.sleep(2000); 210 assertEquals(15L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 211 212 byte[] value1 = new byte[0]; 213 table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value1)); 214 TEST_UTIL.flush(tableName); 215 Thread.sleep(2000); 216 assertEquals(20L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON); 217 218 byte[] value = new byte[0]; 219 table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value)); 220 TEST_UTIL.flush(tableName); 221 Thread.sleep(2000); 222 assertEquals(Double.MAX_VALUE, throughputController.getMaxThroughput(), EPSILON); 223 224 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, 225 NoLimitThroughputController.class.getName()); 226 regionServer.getCompactSplitThread().onConfigurationChange(conf); 227 assertTrue(throughputController.isStopped()); 228 assertTrue(regionServer.getCompactSplitThread() 229 .getCompactionThroughputController() instanceof NoLimitThroughputController); 230 } finally { 231 conn.close(); 232 TEST_UTIL.shutdownMiniCluster(); 233 } 234 } 235 236 /** 237 * Test the logic that we calculate compaction pressure for a striped store. 238 */ 239 @Test 240 public void testGetCompactionPressureForStripedStore() throws Exception { 241 Configuration conf = TEST_UTIL.getConfiguration(); 242 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 243 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false); 244 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2); 245 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4); 246 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12); 247 TEST_UTIL.startMiniCluster(1); 248 Connection conn = ConnectionFactory.createConnection(conf); 249 try { 250 TEST_UTIL.getAdmin() 251 .createTable(TableDescriptorBuilder.newBuilder(tableName) 252 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false) 253 .build()); 254 TEST_UTIL.waitTableAvailable(tableName); 255 HStore store = getStoreWithName(tableName); 256 assertEquals(0, store.getStorefilesCount()); 257 assertEquals(0.0, store.getCompactionPressure(), EPSILON); 258 Table table = conn.getTable(tableName); 259 for (int i = 0; i < 4; i++) { 260 byte[] value1 = new byte[0]; 261 table.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, value1)); 262 byte[] value = new byte[0]; 263 table.put(new Put(Bytes.toBytes(100 + i)).addColumn(family, qualifier, value)); 264 TEST_UTIL.flush(tableName); 265 } 266 assertEquals(8, store.getStorefilesCount()); 267 assertEquals(0.0, store.getCompactionPressure(), EPSILON); 268 269 byte[] value5 = new byte[0]; 270 table.put(new Put(Bytes.toBytes(4)).addColumn(family, qualifier, value5)); 271 byte[] value4 = new byte[0]; 272 table.put(new Put(Bytes.toBytes(104)).addColumn(family, qualifier, value4)); 273 TEST_UTIL.flush(tableName); 274 assertEquals(10, store.getStorefilesCount()); 275 assertEquals(0.5, store.getCompactionPressure(), EPSILON); 276 277 byte[] value3 = new byte[0]; 278 table.put(new Put(Bytes.toBytes(5)).addColumn(family, qualifier, value3)); 279 byte[] value2 = new byte[0]; 280 table.put(new Put(Bytes.toBytes(105)).addColumn(family, qualifier, value2)); 281 TEST_UTIL.flush(tableName); 282 assertEquals(12, store.getStorefilesCount()); 283 assertEquals(1.0, store.getCompactionPressure(), EPSILON); 284 285 byte[] value1 = new byte[0]; 286 table.put(new Put(Bytes.toBytes(6)).addColumn(family, qualifier, value1)); 287 byte[] value = new byte[0]; 288 table.put(new Put(Bytes.toBytes(106)).addColumn(family, qualifier, value)); 289 TEST_UTIL.flush(tableName); 290 assertEquals(14, store.getStorefilesCount()); 291 assertEquals(2.0, store.getCompactionPressure(), EPSILON); 292 } finally { 293 conn.close(); 294 TEST_UTIL.shutdownMiniCluster(); 295 } 296 } 297}