001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.concurrent.atomic.AtomicLong; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Append; 040import org.apache.hadoop.hbase.client.ClientServiceCallable; 041import org.apache.hadoop.hbase.client.ClusterConnection; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Mutation; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.RpcRetryingCaller; 051import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 052import org.apache.hadoop.hbase.client.Scan; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.master.HMaster; 055import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement; 056import org.apache.hadoop.hbase.regionserver.HRegionServer; 057import org.apache.hadoop.hbase.security.AccessDeniedException; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.util.StringUtils; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * End-to-end test class for filesystem space quotas. 074 */ 075@Category(LargeTests.class) 076public class TestSpaceQuotas { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestSpaceQuotas.class); 081 082 private static final Logger LOG = LoggerFactory.getLogger(TestSpaceQuotas.class); 083 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 084 // Global for all tests in the class 085 private static final AtomicLong COUNTER = new AtomicLong(0); 086 private static final int NUM_RETRIES = 10; 087 088 @Rule 089 public TestName testName = new TestName(); 090 private SpaceQuotaHelperForTests helper; 091 092 @BeforeClass 093 public static void setUp() throws Exception { 094 Configuration conf = TEST_UTIL.getConfiguration(); 095 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 096 TEST_UTIL.startMiniCluster(1); 097 } 098 099 @AfterClass 100 public static void tearDown() throws Exception { 101 TEST_UTIL.shutdownMiniCluster(); 102 } 103 104 @Before 105 public void removeAllQuotas() throws Exception { 106 final Connection conn = TEST_UTIL.getConnection(); 107 if (helper == null) { 108 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 109 } 110 // Wait for the quota table to be created 111 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 112 helper.waitForQuotaTable(conn); 113 } else { 114 // Or, clean up any quotas from previous test runs. 115 helper.removeAllQuotas(conn); 116 assertEquals(0, helper.listNumDefinedQuotas(conn)); 117 } 118 } 119 120 @Test 121 public void testNoInsertsWithPut() throws Exception { 122 Put p = new Put(Bytes.toBytes("to_reject")); 123 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 124 Bytes.toBytes("reject")); 125 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p); 126 } 127 128 @Test 129 public void testNoInsertsWithAppend() throws Exception { 130 Append a = new Append(Bytes.toBytes("to_reject")); 131 a.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 132 Bytes.toBytes("reject")); 133 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a); 134 } 135 136 @Test 137 public void testNoInsertsWithIncrement() throws Exception { 138 Increment i = new Increment(Bytes.toBytes("to_reject")); 139 i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); 140 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i); 141 } 142 143 @Test 144 public void testDeletesAfterNoInserts() throws Exception { 145 final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS); 146 // Try a couple of times to verify that the quota never gets enforced, same as we 147 // do when we're trying to catch the failure. 148 Delete d = new Delete(Bytes.toBytes("should_not_be_rejected")); 149 for (int i = 0; i < NUM_RETRIES; i++) { 150 try (Table t = TEST_UTIL.getConnection().getTable(tn)) { 151 t.delete(d); 152 } 153 } 154 } 155 156 @Test 157 public void testNoWritesWithPut() throws Exception { 158 Put p = new Put(Bytes.toBytes("to_reject")); 159 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 160 Bytes.toBytes("reject")); 161 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); 162 } 163 164 @Test 165 public void testNoWritesWithAppend() throws Exception { 166 Append a = new Append(Bytes.toBytes("to_reject")); 167 a.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 168 Bytes.toBytes("reject")); 169 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a); 170 } 171 172 @Test 173 public void testNoWritesWithIncrement() throws Exception { 174 Increment i = new Increment(Bytes.toBytes("to_reject")); 175 i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0); 176 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i); 177 } 178 179 @Test 180 public void testNoWritesWithDelete() throws Exception { 181 Delete d = new Delete(Bytes.toBytes("to_reject")); 182 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d); 183 } 184 185 @Test 186 public void testNoCompactions() throws Exception { 187 Put p = new Put(Bytes.toBytes("to_reject")); 188 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 189 Bytes.toBytes("reject")); 190 final TableName tn = 191 writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p); 192 // We know the policy is active at this point 193 194 // Major compactions should be rejected 195 try { 196 TEST_UTIL.getAdmin().majorCompact(tn); 197 fail("Expected that invoking the compaction should throw an Exception"); 198 } catch (DoNotRetryIOException e) { 199 // Expected! 200 } 201 // Minor compactions should also be rejected. 202 try { 203 TEST_UTIL.getAdmin().compact(tn); 204 fail("Expected that invoking the compaction should throw an Exception"); 205 } catch (DoNotRetryIOException e) { 206 // Expected! 207 } 208 } 209 210 @Test 211 public void testNoEnableAfterDisablePolicy() throws Exception { 212 Put p = new Put(Bytes.toBytes("to_reject")); 213 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 214 Bytes.toBytes("reject")); 215 final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE); 216 final Admin admin = TEST_UTIL.getAdmin(); 217 // Disabling a table relies on some external action (over the other policies), so wait a bit 218 // more than the other tests. 219 for (int i = 0; i < NUM_RETRIES * 2; i++) { 220 if (admin.isTableEnabled(tn)) { 221 LOG.info(tn + " is still enabled, expecting it to be disabled. Will wait and re-check."); 222 Thread.sleep(2000); 223 } 224 } 225 assertFalse(tn + " is still enabled but it should be disabled", admin.isTableEnabled(tn)); 226 try { 227 admin.enableTable(tn); 228 } catch (AccessDeniedException e) { 229 String exceptionContents = StringUtils.stringifyException(e); 230 final String expectedText = "violated space quota"; 231 assertTrue( 232 "Expected the exception to contain " + expectedText + ", but was: " + exceptionContents, 233 exceptionContents.contains(expectedText)); 234 } 235 } 236 237 @Test 238 public void testNoBulkLoadsWithNoWrites() throws Exception { 239 Put p = new Put(Bytes.toBytes("to_reject")); 240 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 241 Bytes.toBytes("reject")); 242 TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p); 243 244 // The table is now in violation. Try to do a bulk load 245 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50); 246 ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); 247 RpcRetryingCallerFactory factory = 248 new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration()); 249 RpcRetryingCaller<Void> caller = factory.newCaller(); 250 try { 251 caller.callWithRetries(callable, Integer.MAX_VALUE); 252 fail("Expected the bulk load call to fail!"); 253 } catch (SpaceLimitingException e) { 254 // Pass 255 LOG.trace("Caught expected exception", e); 256 } 257 } 258 259 @Test 260 public void testAtomicBulkLoadUnderQuota() throws Exception { 261 // Need to verify that if the batch of hfiles cannot be loaded, none are loaded. 262 TableName tn = helper.createTableWithRegions(10); 263 264 final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE; 265 QuotaSettings settings = 266 QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS); 267 TEST_UTIL.getAdmin().setQuota(settings); 268 269 HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 270 RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager(); 271 Map<TableName, SpaceQuotaSnapshot> snapshots = spaceQuotaManager.copyQuotaSnapshots(); 272 Map<RegionInfo, Long> regionSizes = getReportedSizesForTable(tn); 273 while (true) { 274 SpaceQuotaSnapshot snapshot = snapshots.get(tn); 275 if (snapshot != null && snapshot.getLimit() > 0) { 276 break; 277 } 278 LOG.debug("Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: " 279 + regionSizes); 280 Thread.sleep(3000); 281 snapshots = spaceQuotaManager.copyQuotaSnapshots(); 282 regionSizes = getReportedSizesForTable(tn); 283 } 284 // Our quota limit should be reflected in the latest snapshot 285 SpaceQuotaSnapshot snapshot = snapshots.get(tn); 286 assertEquals(0L, snapshot.getUsage()); 287 assertEquals(sizeLimit, snapshot.getLimit()); 288 289 // We would also not have a "real" policy in violation 290 ActivePolicyEnforcement activePolicies = spaceQuotaManager.getActiveEnforcements(); 291 SpaceViolationPolicyEnforcement enforcement = activePolicies.getPolicyEnforcement(tn); 292 assertTrue("Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(), 293 enforcement instanceof DefaultViolationPolicyEnforcement); 294 295 // Should generate two files, each of which is over 25KB each 296 ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 2, 525); 297 FileSystem fs = TEST_UTIL.getTestFileSystem(); 298 FileStatus[] files = 299 fs.listStatus(new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files")); 300 for (FileStatus file : files) { 301 assertTrue("Expected the file, " + file.getPath() 302 + ", length to be larger than 25KB, but was " + file.getLen(), 303 file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE); 304 LOG.debug(file.getPath() + " -> " + file.getLen() + "B"); 305 } 306 307 ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); 308 RpcRetryingCallerFactory factory = 309 new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration()); 310 RpcRetryingCaller<Void> caller = factory.newCaller(); 311 try { 312 caller.callWithRetries(callable, Integer.MAX_VALUE); 313 fail("Expected the bulk load call to fail!"); 314 } catch (SpaceLimitingException e) { 315 // Pass 316 LOG.trace("Caught expected exception", e); 317 } 318 // Verify that we have no data in the table because neither file should have been 319 // loaded even though one of the files could have. 320 Table table = TEST_UTIL.getConnection().getTable(tn); 321 ResultScanner scanner = table.getScanner(new Scan()); 322 try { 323 assertNull("Expected no results", scanner.next()); 324 } finally { 325 scanner.close(); 326 } 327 } 328 329 @Test 330 public void testTableQuotaOverridesNamespaceQuota() throws Exception { 331 final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS; 332 final TableName tn = helper.createTableWithRegions(10); 333 334 // 2MB limit on the table, 1GB limit on the namespace 335 final long tableLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 336 final long namespaceLimit = 1024L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 337 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy)); 338 TEST_UTIL.getAdmin().setQuota( 339 QuotaSettingsFactory.limitNamespaceSpace(tn.getNamespaceAsString(), namespaceLimit, policy)); 340 341 // Write more data than should be allowed and flush it to disk 342 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 343 344 // This should be sufficient time for the chores to run and see the change. 345 Thread.sleep(5000); 346 347 // The write should be rejected because the table quota takes priority over the namespace 348 Put p = new Put(Bytes.toBytes("to_reject")); 349 p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), 350 Bytes.toBytes("reject")); 351 verifyViolation(policy, tn, p); 352 } 353 354 private Map<RegionInfo, Long> getReportedSizesForTable(TableName tn) { 355 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 356 MasterQuotaManager quotaManager = master.getMasterQuotaManager(); 357 Map<RegionInfo, Long> filteredRegionSizes = new HashMap<>(); 358 for (Entry<RegionInfo, Long> entry : quotaManager.snapshotRegionSizes().entrySet()) { 359 if (entry.getKey().getTable().equals(tn)) { 360 filteredRegionSizes.put(entry.getKey(), entry.getValue()); 361 } 362 } 363 return filteredRegionSizes; 364 } 365 366 private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception { 367 TableName tn = helper.createTableWithRegions(10); 368 369 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 370 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, policyToViolate); 371 TEST_UTIL.getAdmin().setQuota(settings); 372 373 // Write more data than should be allowed and flush it to disk 374 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 375 376 // This should be sufficient time for the chores to run and see the change. 377 Thread.sleep(5000); 378 379 return tn; 380 } 381 382 private TableName writeUntilViolationAndVerifyViolation(SpaceViolationPolicy policyToViolate, 383 Mutation m) throws Exception { 384 final TableName tn = writeUntilViolation(policyToViolate); 385 verifyViolation(policyToViolate, tn, m); 386 return tn; 387 } 388 389 private void verifyViolation(SpaceViolationPolicy policyToViolate, TableName tn, Mutation m) 390 throws Exception { 391 // But let's try a few times to get the exception before failing 392 boolean sawError = false; 393 for (int i = 0; i < NUM_RETRIES && !sawError; i++) { 394 try (Table table = TEST_UTIL.getConnection().getTable(tn)) { 395 if (m instanceof Put) { 396 table.put((Put) m); 397 } else if (m instanceof Delete) { 398 table.delete((Delete) m); 399 } else if (m instanceof Append) { 400 table.append((Append) m); 401 } else if (m instanceof Increment) { 402 table.increment((Increment) m); 403 } else { 404 fail( 405 "Failed to apply " + m.getClass().getSimpleName() + " to the table. Programming error"); 406 } 407 LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry"); 408 Thread.sleep(2000); 409 } catch (Exception e) { 410 String msg = StringUtils.stringifyException(e); 411 assertTrue("Expected exception message to contain the word '" + policyToViolate.name() 412 + "', but was " + msg, msg.contains(policyToViolate.name())); 413 sawError = true; 414 } 415 } 416 if (!sawError) { 417 try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 418 ResultScanner scanner = quotaTable.getScanner(new Scan()); 419 Result result = null; 420 LOG.info("Dumping contents of hbase:quota table"); 421 while ((result = scanner.next()) != null) { 422 LOG.info(Bytes.toString(result.getRow()) + " => " + result.toString()); 423 } 424 scanner.close(); 425 } 426 } 427 assertTrue("Expected to see an exception writing data to a table exceeding its quota", 428 sawError); 429 } 430}