001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicLong; 026import java.util.stream.Collectors; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileStatus; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ClientServiceCallable; 037import org.apache.hadoop.hbase.client.ClusterConnection; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.RpcRetryingCaller; 041import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 042import org.apache.hadoop.hbase.client.SnapshotType; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; 045import org.apache.hadoop.hbase.regionserver.HRegion; 046import org.apache.hadoop.hbase.regionserver.Region; 047import org.apache.hadoop.hbase.regionserver.Store; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.junit.AfterClass; 050import org.junit.Before; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057 058import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 059 060@Category({ MediumTests.class }) 061public class TestLowLatencySpaceQuotas { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestLowLatencySpaceQuotas.class); 066 067 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 068 // Global for all tests in the class 069 private static final AtomicLong COUNTER = new AtomicLong(0); 070 071 @Rule 072 public TestName testName = new TestName(); 073 private SpaceQuotaHelperForTests helper; 074 private Connection conn; 075 private Admin admin; 076 077 @BeforeClass 078 public static void setup() throws Exception { 079 Configuration conf = TEST_UTIL.getConfiguration(); 080 // The default 1s period for QuotaObserverChore is good. 081 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 082 // Set the period/delay to read region size from HDFS to be very long 083 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000 * 120); 084 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000 * 120); 085 // Set the same long period/delay to compute snapshot sizes 086 conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, 1000 * 120); 087 conf.setInt(SnapshotQuotaObserverChore.SNAPSHOT_QUOTA_CHORE_DELAY_KEY, 1000 * 120); 088 // Clean up the compacted files faster than normal (5s instead of 2mins) 089 conf.setInt("hbase.hfile.compaction.discharger.interval", 5 * 1000); 090 091 TEST_UTIL.startMiniCluster(1); 092 } 093 094 @AfterClass 095 public static void tearDown() throws Exception { 096 TEST_UTIL.shutdownMiniCluster(); 097 } 098 099 @Before 100 public void removeAllQuotas() throws Exception { 101 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 102 conn = TEST_UTIL.getConnection(); 103 admin = TEST_UTIL.getAdmin(); 104 helper.waitForQuotaTable(conn); 105 } 106 107 @Test 108 public void testFlushes() throws Exception { 109 TableName tn = helper.createTableWithRegions(1); 110 // Set a quota 111 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 112 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 113 admin.setQuota(settings); 114 115 // Write some data 116 final long initialSize = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 117 helper.writeData(tn, initialSize); 118 119 // Make sure a flush happened 120 admin.flush(tn); 121 122 // We should be able to observe the system recording an increase in size (even 123 // though we know the filesystem scanning did not happen). 124 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 125 @Override 126 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 127 return snapshot.getUsage() >= initialSize; 128 } 129 }); 130 } 131 132 @Test 133 public void testMajorCompaction() throws Exception { 134 TableName tn = helper.createTableWithRegions(1); 135 // Set a quota 136 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 137 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 138 admin.setQuota(settings); 139 140 // Write some data and flush it to disk. 141 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 142 helper.writeData(tn, sizePerBatch); 143 admin.flush(tn); 144 145 // Write the same data again, flushing it to a second file 146 helper.writeData(tn, sizePerBatch); 147 admin.flush(tn); 148 149 // After two flushes, both hfiles would contain similar data. We should see 2x the data. 150 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 151 @Override 152 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 153 return snapshot.getUsage() >= 2L * sizePerBatch; 154 } 155 }); 156 157 // Rewrite the two files into one. 158 admin.majorCompact(tn); 159 160 // After we major compact the table, we should notice quickly that the amount of data in the 161 // table is much closer to reality (the duplicate entries across the two files are removed). 162 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 163 @Override 164 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 165 return snapshot.getUsage() >= sizePerBatch && snapshot.getUsage() <= 2L * sizePerBatch; 166 } 167 }); 168 } 169 170 @Test 171 public void testMinorCompaction() throws Exception { 172 TableName tn = helper.createTableWithRegions(1); 173 // Set a quota 174 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 175 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 176 admin.setQuota(settings); 177 178 // Write some data and flush it to disk. 179 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 180 final long numBatches = 6; 181 for (long i = 0; i < numBatches; i++) { 182 helper.writeData(tn, sizePerBatch); 183 admin.flush(tn); 184 } 185 186 HRegion region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn)); 187 long numFiles = getNumHFilesForRegion(region); 188 assertEquals(numBatches, numFiles); 189 190 // After two flushes, both hfiles would contain similar data. We should see 2x the data. 191 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 192 @Override 193 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 194 return snapshot.getUsage() >= numFiles * sizePerBatch; 195 } 196 }); 197 198 // Rewrite some files into fewer 199 TEST_UTIL.compact(tn, false); 200 long numFilesAfterMinorCompaction = getNumHFilesForRegion(region); 201 202 // After we major compact the table, we should notice quickly that the amount of data in the 203 // table is much closer to reality (the duplicate entries across the two files are removed). 204 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 205 @Override 206 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 207 return snapshot.getUsage() >= numFilesAfterMinorCompaction * sizePerBatch 208 && snapshot.getUsage() <= (numFilesAfterMinorCompaction + 1) * sizePerBatch; 209 } 210 }); 211 } 212 213 private long getNumHFilesForRegion(HRegion region) { 214 return region.getStores().stream().mapToLong((s) -> s.getNumHFiles()).sum(); 215 } 216 217 @Test 218 public void testBulkLoading() throws Exception { 219 TableName tn = helper.createTableWithRegions(1); 220 // Set a quota 221 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 222 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 223 admin.setQuota(settings); 224 admin.compactionSwitch(false, 225 admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList())); 226 227 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 3, 550); 228 // Make sure the files are about as long as we expect 229 FileSystem fs = TEST_UTIL.getTestFileSystem(); 230 FileStatus[] files = 231 fs.listStatus(new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); 232 long totalSize = 0; 233 for (FileStatus file : files) { 234 assertTrue("Expected the file, " + file.getPath() 235 + ", length to be larger than 25KB, but was " + file.getLen(), 236 file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); 237 totalSize += file.getLen(); 238 } 239 240 final ClusterConnection clusterConn = (ClusterConnection) conn; 241 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), 242 clusterConn.getConnectionConfiguration()); 243 RpcRetryingCaller<Void> caller = factory.<Void> newCaller(); 244 caller.callWithRetries(callable, Integer.MAX_VALUE); 245 246 final long finalTotalSize = totalSize; 247 try { 248 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 249 @Override 250 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 251 return snapshot.getUsage() >= finalTotalSize; 252 } 253 }); 254 } finally { 255 admin.compactionSwitch(true, 256 admin.getRegionServers().stream().map(ServerName::toString).collect(Collectors.toList())); 257 } 258 } 259 260 @Test 261 public void testSnapshotSizes() throws Exception { 262 TableName tn = helper.createTableWithRegions(1); 263 // Set a quota 264 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, 265 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS); 266 admin.setQuota(settings); 267 268 // Write some data and flush it to disk. 269 final long sizePerBatch = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 270 helper.writeData(tn, sizePerBatch); 271 admin.flush(tn); 272 273 final String snapshot1 = "snapshot1"; 274 admin.snapshot(snapshot1, tn, SnapshotType.SKIPFLUSH); 275 276 // Compute the size of the file for the Region we'll send to archive 277 Region region = Iterables.getOnlyElement(TEST_UTIL.getHBaseCluster().getRegions(tn)); 278 List<? extends Store> stores = region.getStores(); 279 long summer = 0; 280 for (Store store : stores) { 281 summer += store.getStorefilesSize(); 282 } 283 final long storeFileSize = summer; 284 285 // Wait for the table to show the usage 286 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 287 @Override 288 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 289 return snapshot.getUsage() == storeFileSize; 290 } 291 }); 292 293 // Spoof a "full" computation of snapshot size. Normally the chore handles this, but we want 294 // to test in the absence of this chore. 295 FileArchiverNotifier notifier = TEST_UTIL.getHBaseCluster().getMaster() 296 .getSnapshotQuotaObserverChore().getNotifierForTable(tn); 297 notifier.computeAndStoreSnapshotSizes(Collections.singletonList(snapshot1)); 298 299 // Force a major compaction to create a new file and push the old file to the archive 300 TEST_UTIL.compact(tn, true); 301 302 // After moving the old file to archive/, the space of this table should double 303 // We have a new file created by the majc referenced by the table and the snapshot still 304 // referencing the old file. 305 TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) { 306 @Override 307 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 308 return snapshot.getUsage() >= 2 * storeFileSize; 309 } 310 }); 311 312 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 313 Result r = quotaTable.get(QuotaTableUtil.makeGetForSnapshotSize(tn, snapshot1)); 314 assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty()); 315 assertTrue(r.advance()); 316 assertEquals("The snapshot's size should be the same as the origin store file", storeFileSize, 317 QuotaTableUtil.parseSnapshotSize(r.current())); 318 319 r = quotaTable.get(QuotaTableUtil.createGetNamespaceSnapshotSize(tn.getNamespaceAsString())); 320 assertTrue("Expected a non-null, non-empty Result", r != null && !r.isEmpty()); 321 assertTrue(r.advance()); 322 assertEquals("The snapshot's size should be the same as the origin store file", storeFileSize, 323 QuotaTableUtil.parseSnapshotSize(r.current())); 324 } 325 } 326}