001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Set; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicLong; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.NamespaceNotFoundException; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.master.HMaster; 044import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas; 045import org.apache.hadoop.hbase.testclassification.LargeTests; 046import org.junit.AfterClass; 047import org.junit.Before; 048import org.junit.BeforeClass; 049import org.junit.ClassRule; 050import org.junit.Rule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.junit.rules.TestName; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 058import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota; 061 062/** 063 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster. 064 */ 065@Category(LargeTests.class) 066public class TestQuotaObserverChoreWithMiniCluster { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestQuotaObserverChoreWithMiniCluster.class); 071 072 private static final Logger LOG = 073 LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class); 074 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 075 private static final AtomicLong COUNTER = new AtomicLong(0); 076 private static final long DEFAULT_WAIT_MILLIS = 500; 077 078 @Rule 079 public TestName testName = new TestName(); 080 081 private HMaster master; 082 private QuotaObserverChore chore; 083 private SpaceQuotaSnapshotNotifierForTest snapshotNotifier; 084 private SpaceQuotaHelperForTests helper; 085 086 @BeforeClass 087 public static void setUp() throws Exception { 088 Configuration conf = TEST_UTIL.getConfiguration(); 089 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 090 conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY, 091 SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class); 092 TEST_UTIL.startMiniCluster(1); 093 } 094 095 @AfterClass 096 public static void tearDown() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 @Before 101 public void removeAllQuotas() throws Exception { 102 final Connection conn = TEST_UTIL.getConnection(); 103 if (helper == null) { 104 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 105 } 106 // Wait for the quota table to be created 107 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) { 108 helper.waitForQuotaTable(conn); 109 } else { 110 // Or, clean up any quotas from previous test runs. 111 helper.removeAllQuotas(conn); 112 assertEquals(0, helper.listNumDefinedQuotas(conn)); 113 } 114 115 master = TEST_UTIL.getMiniHBaseCluster().getMaster(); 116 snapshotNotifier = (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier(); 117 assertNotNull(snapshotNotifier); 118 snapshotNotifier.clearSnapshots(); 119 chore = master.getQuotaObserverChore(); 120 } 121 122 @Test 123 public void testTableViolatesQuota() throws Exception { 124 TableName tn = helper.createTableWithRegions(10); 125 126 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 127 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS; 128 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy); 129 TEST_UTIL.getAdmin().setQuota(settings); 130 131 // Write more data than should be allowed 132 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 133 134 Map<TableName, SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots(); 135 boolean foundSnapshot = false; 136 while (!foundSnapshot) { 137 if (quotaSnapshots.isEmpty()) { 138 LOG.info("Found no violated quotas, sleeping and retrying. Current reports: " 139 + master.getMasterQuotaManager().snapshotRegionSizes()); 140 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 141 quotaSnapshots = snapshotNotifier.copySnapshots(); 142 } else { 143 Entry<TableName, SpaceQuotaSnapshot> entry = 144 Iterables.getOnlyElement(quotaSnapshots.entrySet()); 145 assertEquals(tn, entry.getKey()); 146 final SpaceQuotaSnapshot snapshot = entry.getValue(); 147 if (!snapshot.getQuotaStatus().isInViolation()) { 148 LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot); 149 sleepWithInterrupt(DEFAULT_WAIT_MILLIS); 150 quotaSnapshots = snapshotNotifier.copySnapshots(); 151 } else { 152 foundSnapshot = true; 153 } 154 } 155 } 156 157 Entry<TableName, SpaceQuotaSnapshot> entry = 158 Iterables.getOnlyElement(quotaSnapshots.entrySet()); 159 assertEquals(tn, entry.getKey()); 160 final SpaceQuotaSnapshot snapshot = entry.getValue(); 161 assertEquals("Snapshot was " + snapshot, violationPolicy, 162 snapshot.getQuotaStatus().getPolicy().get()); 163 assertEquals(sizeLimit, snapshot.getLimit()); 164 assertTrue("The usage should be greater than the limit, but were " + snapshot.getUsage() 165 + " and " + snapshot.getLimit() + ", respectively", 166 snapshot.getUsage() > snapshot.getLimit()); 167 } 168 169 @Test 170 public void testNamespaceViolatesQuota() throws Exception { 171 final String namespace = testName.getMethodName(); 172 final Admin admin = TEST_UTIL.getAdmin(); 173 // Ensure the namespace exists 174 try { 175 admin.getNamespaceDescriptor(namespace); 176 } catch (NamespaceNotFoundException e) { 177 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 178 admin.createNamespace(desc); 179 } 180 181 TableName tn1 = helper.createTableWithRegions(namespace, 5); 182 TableName tn2 = helper.createTableWithRegions(namespace, 5); 183 TableName tn3 = helper.createTableWithRegions(namespace, 5); 184 185 final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 186 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE; 187 QuotaSettings settings = 188 QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy); 189 admin.setQuota(settings); 190 191 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 192 admin.flush(tn1); 193 Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 194 for (int i = 0; i < 5; i++) { 195 // Check a few times to make sure we don't prematurely move to violation 196 assertEquals("Should not see any quota violations after writing 2MB of data", 0, 197 numSnapshotsInViolation(snapshots)); 198 try { 199 Thread.sleep(DEFAULT_WAIT_MILLIS); 200 } catch (InterruptedException e) { 201 LOG.debug("Interrupted while sleeping.", e); 202 } 203 snapshots = snapshotNotifier.copySnapshots(); 204 } 205 206 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 207 admin.flush(tn2); 208 snapshots = snapshotNotifier.copySnapshots(); 209 for (int i = 0; i < 5; i++) { 210 // Check a few times to make sure we don't prematurely move to violation 211 assertEquals("Should not see any quota violations after writing 4MB of data", 0, 212 numSnapshotsInViolation(snapshots)); 213 try { 214 Thread.sleep(DEFAULT_WAIT_MILLIS); 215 } catch (InterruptedException e) { 216 LOG.debug("Interrupted while sleeping.", e); 217 } 218 snapshots = snapshotNotifier.copySnapshots(); 219 } 220 221 // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total) 222 // and should push all three tables in the namespace into violation. 223 helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 224 admin.flush(tn3); 225 snapshots = snapshotNotifier.copySnapshots(); 226 while (numSnapshotsInViolation(snapshots) < 3) { 227 LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots 228 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 229 try { 230 Thread.sleep(DEFAULT_WAIT_MILLIS); 231 } catch (InterruptedException e) { 232 LOG.debug("Interrupted while sleeping.", e); 233 Thread.currentThread().interrupt(); 234 } 235 snapshots = snapshotNotifier.copySnapshots(); 236 } 237 238 SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1); 239 assertNotNull("tn1 should be in violation", snapshot1); 240 assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get()); 241 SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2); 242 assertNotNull("tn2 should be in violation", snapshot2); 243 assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get()); 244 SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3); 245 assertNotNull("tn3 should be in violation", snapshot3); 246 assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get()); 247 assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty()); 248 } 249 250 @Test 251 public void testTableQuotaOverridesNamespaceQuota() throws Exception { 252 final String namespace = testName.getMethodName(); 253 final Admin admin = TEST_UTIL.getAdmin(); 254 // Ensure the namespace exists 255 try { 256 admin.getNamespaceDescriptor(namespace); 257 } catch (NamespaceNotFoundException e) { 258 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build(); 259 admin.createNamespace(desc); 260 } 261 262 TableName tn1 = helper.createTableWithRegions(namespace, 5); 263 TableName tn2 = helper.createTableWithRegions(namespace, 5); 264 265 final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE; 266 final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE; 267 QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace, 268 namespaceSizeLimit, namespaceViolationPolicy); 269 admin.setQuota(namespaceSettings); 270 271 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 272 admin.flush(tn1); 273 Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots(); 274 for (int i = 0; i < 5; i++) { 275 // Check a few times to make sure we don't prematurely move to violation 276 assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0, 277 numSnapshotsInViolation(snapshots)); 278 try { 279 Thread.sleep(DEFAULT_WAIT_MILLIS); 280 } catch (InterruptedException e) { 281 LOG.debug("Interrupted while sleeping.", e); 282 } 283 snapshots = snapshotNotifier.copySnapshots(); 284 } 285 286 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE); 287 admin.flush(tn2); 288 snapshots = snapshotNotifier.copySnapshots(); 289 while (numSnapshotsInViolation(snapshots) < 2) { 290 LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots 291 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes()); 292 try { 293 Thread.sleep(DEFAULT_WAIT_MILLIS); 294 } catch (InterruptedException e) { 295 LOG.debug("Interrupted while sleeping.", e); 296 Thread.currentThread().interrupt(); 297 } 298 snapshots = snapshotNotifier.copySnapshots(); 299 } 300 301 SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1); 302 assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1); 303 assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get()); 304 SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2); 305 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); 306 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get()); 307 308 // Override the namespace quota with a table quota 309 final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE; 310 final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS; 311 QuotaSettings tableSettings = 312 QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, tableViolationPolicy); 313 admin.setQuota(tableSettings); 314 315 // Keep checking for the table quota policy to override the namespace quota 316 while (true) { 317 snapshots = snapshotNotifier.copySnapshots(); 318 SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1); 319 assertNotNull("Violation policy should never be null", actualTableSnapshot); 320 if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) { 321 LOG.debug("Saw unexpected table violation policy, waiting and re-checking."); 322 try { 323 Thread.sleep(DEFAULT_WAIT_MILLIS); 324 } catch (InterruptedException e) { 325 LOG.debug("Interrupted while sleeping"); 326 Thread.currentThread().interrupt(); 327 } 328 continue; 329 } 330 assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get()); 331 break; 332 } 333 334 // This should not change with the introduction of the table quota for tn1 335 actualPolicyTN2 = snapshots.get(tn2); 336 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2); 337 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get()); 338 } 339 340 @Test 341 public void testGetAllTablesWithQuotas() throws Exception { 342 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 343 Set<TableName> tablesWithQuotas = new HashSet<>(); 344 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 345 // Partition the tables with quotas by table and ns quota 346 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 347 348 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 349 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); 350 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, 351 tables.getNamespaceQuotaTables()); 352 } 353 354 @Test 355 public void testRpcQuotaTablesAreFiltered() throws Exception { 356 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas(); 357 Set<TableName> tablesWithQuotas = new HashSet<>(); 358 Set<TableName> namespaceTablesWithQuotas = new HashSet<>(); 359 // Partition the tables with quotas by table and ns quota 360 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas); 361 362 TableName rpcQuotaTable = helper.createTable(); 363 TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.throttleTable(rpcQuotaTable, 364 ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES)); 365 366 // The `rpcQuotaTable` should not be included in this Set 367 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined(); 368 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables()); 369 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, 370 tables.getNamespaceQuotaTables()); 371 } 372 373 @Test 374 public void testFilterRegions() throws Exception { 375 Map<TableName, Integer> mockReportedRegions = new HashMap<>(); 376 // Can't mock because of primitive int as a return type -- Mockito 377 // can only handle an Integer. 378 TablesWithQuotas tables = 379 new TablesWithQuotas(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()) { 380 @Override 381 int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) { 382 Integer i = mockReportedRegions.get(table); 383 if (i == null) { 384 return 0; 385 } 386 return i; 387 } 388 }; 389 390 // Create the tables 391 TableName tn1 = helper.createTableWithRegions(20); 392 TableName tn2 = helper.createTableWithRegions(20); 393 TableName tn3 = helper.createTableWithRegions(20); 394 395 // Add them to the Tables with Quotas object 396 tables.addTableQuotaTable(tn1); 397 tables.addTableQuotaTable(tn2); 398 tables.addTableQuotaTable(tn3); 399 400 // Mock the number of regions reported 401 mockReportedRegions.put(tn1, 10); // 50% 402 mockReportedRegions.put(tn2, 19); // 95% 403 mockReportedRegions.put(tn3, 20); // 100% 404 405 // Argument is un-used 406 tables.filterInsufficientlyReportedTables(null); 407 // The default of 95% reported should prevent tn1 from appearing 408 assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables()); 409 } 410 411 @Test 412 public void testFetchSpaceQuota() throws Exception { 413 Multimap<TableName, QuotaSettings> tables = helper.createTablesWithSpaceQuotas(); 414 // Can pass in an empty map, we're not consulting it. 415 chore.initializeSnapshotStores(Collections.emptyMap()); 416 // All tables that were created should have a quota defined. 417 for (Entry<TableName, QuotaSettings> entry : tables.entries()) { 418 final TableName table = entry.getKey(); 419 final QuotaSettings qs = entry.getValue(); 420 421 assertTrue("QuotaSettings was an instance of " + qs.getClass(), 422 qs instanceof SpaceLimitSettings); 423 424 SpaceQuota spaceQuota = null; 425 if (qs.getTableName() != null) { 426 spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table); 427 assertNotNull("Could not find table space quota for " + table, spaceQuota); 428 } else if (qs.getNamespace() != null) { 429 spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString()); 430 assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), 431 spaceQuota); 432 } else { 433 fail("Expected table or namespace space quota"); 434 } 435 436 final SpaceLimitSettings sls = (SpaceLimitSettings) qs; 437 assertEquals(sls.getProto().getQuota(), spaceQuota); 438 } 439 440 TableName tableWithoutQuota = helper.createTable(); 441 assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota)); 442 } 443 444 private int numSnapshotsInViolation(Map<TableName, SpaceQuotaSnapshot> snapshots) { 445 int sum = 0; 446 for (SpaceQuotaSnapshot snapshot : snapshots.values()) { 447 if (snapshot.getQuotaStatus().isInViolation()) { 448 sum++; 449 } 450 } 451 return sum; 452 } 453 454 private void sleepWithInterrupt(long millis) { 455 try { 456 Thread.sleep(millis); 457 } catch (InterruptedException e) { 458 LOG.debug("Interrupted while sleeping"); 459 Thread.currentThread().interrupt(); 460 } 461 } 462}