001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.Set;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.NamespaceNotFoundException;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.master.HMaster;
044import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
045import org.apache.hadoop.hbase.testclassification.LargeTests;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Rule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053import org.junit.rules.TestName;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
058import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
061
062/**
063 * Test class for {@link QuotaObserverChore} that uses a live HBase cluster.
064 */
065@Category(LargeTests.class)
066public class TestQuotaObserverChoreWithMiniCluster {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestQuotaObserverChoreWithMiniCluster.class);
071
072  private static final Logger LOG =
073    LoggerFactory.getLogger(TestQuotaObserverChoreWithMiniCluster.class);
074  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075  private static final AtomicLong COUNTER = new AtomicLong(0);
076  private static final long DEFAULT_WAIT_MILLIS = 500;
077
078  @Rule
079  public TestName testName = new TestName();
080
081  private HMaster master;
082  private QuotaObserverChore chore;
083  private SpaceQuotaSnapshotNotifierForTest snapshotNotifier;
084  private SpaceQuotaHelperForTests helper;
085
086  @BeforeClass
087  public static void setUp() throws Exception {
088    Configuration conf = TEST_UTIL.getConfiguration();
089    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
090    conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
091      SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class);
092    TEST_UTIL.startMiniCluster(1);
093  }
094
095  @AfterClass
096  public static void tearDown() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  @Before
101  public void removeAllQuotas() throws Exception {
102    final Connection conn = TEST_UTIL.getConnection();
103    if (helper == null) {
104      helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
105    }
106    // Wait for the quota table to be created
107    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
108      helper.waitForQuotaTable(conn);
109    } else {
110      // Or, clean up any quotas from previous test runs.
111      helper.removeAllQuotas(conn);
112      assertEquals(0, helper.listNumDefinedQuotas(conn));
113    }
114
115    master = TEST_UTIL.getMiniHBaseCluster().getMaster();
116    snapshotNotifier = (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier();
117    assertNotNull(snapshotNotifier);
118    snapshotNotifier.clearSnapshots();
119    chore = master.getQuotaObserverChore();
120  }
121
122  @Test
123  public void testTableViolatesQuota() throws Exception {
124    TableName tn = helper.createTableWithRegions(10);
125
126    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
127    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
128    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
129    TEST_UTIL.getAdmin().setQuota(settings);
130
131    // Write more data than should be allowed
132    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
133
134    Map<TableName, SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots();
135    boolean foundSnapshot = false;
136    while (!foundSnapshot) {
137      if (quotaSnapshots.isEmpty()) {
138        LOG.info("Found no violated quotas, sleeping and retrying. Current reports: "
139          + master.getMasterQuotaManager().snapshotRegionSizes());
140        sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
141        quotaSnapshots = snapshotNotifier.copySnapshots();
142      } else {
143        Entry<TableName, SpaceQuotaSnapshot> entry =
144          Iterables.getOnlyElement(quotaSnapshots.entrySet());
145        assertEquals(tn, entry.getKey());
146        final SpaceQuotaSnapshot snapshot = entry.getValue();
147        if (!snapshot.getQuotaStatus().isInViolation()) {
148          LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot);
149          sleepWithInterrupt(DEFAULT_WAIT_MILLIS);
150          quotaSnapshots = snapshotNotifier.copySnapshots();
151        } else {
152          foundSnapshot = true;
153        }
154      }
155    }
156
157    Entry<TableName, SpaceQuotaSnapshot> entry =
158      Iterables.getOnlyElement(quotaSnapshots.entrySet());
159    assertEquals(tn, entry.getKey());
160    final SpaceQuotaSnapshot snapshot = entry.getValue();
161    assertEquals("Snapshot was " + snapshot, violationPolicy,
162      snapshot.getQuotaStatus().getPolicy().get());
163    assertEquals(sizeLimit, snapshot.getLimit());
164    assertTrue("The usage should be greater than the limit, but were " + snapshot.getUsage()
165      + " and " + snapshot.getLimit() + ", respectively",
166      snapshot.getUsage() > snapshot.getLimit());
167  }
168
169  @Test
170  public void testNamespaceViolatesQuota() throws Exception {
171    final String namespace = testName.getMethodName();
172    final Admin admin = TEST_UTIL.getAdmin();
173    // Ensure the namespace exists
174    try {
175      admin.getNamespaceDescriptor(namespace);
176    } catch (NamespaceNotFoundException e) {
177      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
178      admin.createNamespace(desc);
179    }
180
181    TableName tn1 = helper.createTableWithRegions(namespace, 5);
182    TableName tn2 = helper.createTableWithRegions(namespace, 5);
183    TableName tn3 = helper.createTableWithRegions(namespace, 5);
184
185    final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
186    final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
187    QuotaSettings settings =
188      QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
189    admin.setQuota(settings);
190
191    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
192    admin.flush(tn1);
193    Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
194    for (int i = 0; i < 5; i++) {
195      // Check a few times to make sure we don't prematurely move to violation
196      assertEquals("Should not see any quota violations after writing 2MB of data", 0,
197        numSnapshotsInViolation(snapshots));
198      try {
199        Thread.sleep(DEFAULT_WAIT_MILLIS);
200      } catch (InterruptedException e) {
201        LOG.debug("Interrupted while sleeping.", e);
202      }
203      snapshots = snapshotNotifier.copySnapshots();
204    }
205
206    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
207    admin.flush(tn2);
208    snapshots = snapshotNotifier.copySnapshots();
209    for (int i = 0; i < 5; i++) {
210      // Check a few times to make sure we don't prematurely move to violation
211      assertEquals("Should not see any quota violations after writing 4MB of data", 0,
212        numSnapshotsInViolation(snapshots));
213      try {
214        Thread.sleep(DEFAULT_WAIT_MILLIS);
215      } catch (InterruptedException e) {
216        LOG.debug("Interrupted while sleeping.", e);
217      }
218      snapshots = snapshotNotifier.copySnapshots();
219    }
220
221    // Writing the final 2MB of data will push the namespace over the 5MB limit (6MB in total)
222    // and should push all three tables in the namespace into violation.
223    helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
224    admin.flush(tn3);
225    snapshots = snapshotNotifier.copySnapshots();
226    while (numSnapshotsInViolation(snapshots) < 3) {
227      LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots
228        + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
229      try {
230        Thread.sleep(DEFAULT_WAIT_MILLIS);
231      } catch (InterruptedException e) {
232        LOG.debug("Interrupted while sleeping.", e);
233        Thread.currentThread().interrupt();
234      }
235      snapshots = snapshotNotifier.copySnapshots();
236    }
237
238    SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
239    assertNotNull("tn1 should be in violation", snapshot1);
240    assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy().get());
241    SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
242    assertNotNull("tn2 should be in violation", snapshot2);
243    assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy().get());
244    SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
245    assertNotNull("tn3 should be in violation", snapshot3);
246    assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy().get());
247    assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
248  }
249
250  @Test
251  public void testTableQuotaOverridesNamespaceQuota() throws Exception {
252    final String namespace = testName.getMethodName();
253    final Admin admin = TEST_UTIL.getAdmin();
254    // Ensure the namespace exists
255    try {
256      admin.getNamespaceDescriptor(namespace);
257    } catch (NamespaceNotFoundException e) {
258      NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
259      admin.createNamespace(desc);
260    }
261
262    TableName tn1 = helper.createTableWithRegions(namespace, 5);
263    TableName tn2 = helper.createTableWithRegions(namespace, 5);
264
265    final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
266    final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE;
267    QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace,
268      namespaceSizeLimit, namespaceViolationPolicy);
269    admin.setQuota(namespaceSettings);
270
271    helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
272    admin.flush(tn1);
273    Map<TableName, SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
274    for (int i = 0; i < 5; i++) {
275      // Check a few times to make sure we don't prematurely move to violation
276      assertEquals("Should not see any quota violations after writing 2MB of data: " + snapshots, 0,
277        numSnapshotsInViolation(snapshots));
278      try {
279        Thread.sleep(DEFAULT_WAIT_MILLIS);
280      } catch (InterruptedException e) {
281        LOG.debug("Interrupted while sleeping.", e);
282      }
283      snapshots = snapshotNotifier.copySnapshots();
284    }
285
286    helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
287    admin.flush(tn2);
288    snapshots = snapshotNotifier.copySnapshots();
289    while (numSnapshotsInViolation(snapshots) < 2) {
290      LOG.debug("Saw fewer violations than desired (expected 2): " + snapshots
291        + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
292      try {
293        Thread.sleep(DEFAULT_WAIT_MILLIS);
294      } catch (InterruptedException e) {
295        LOG.debug("Interrupted while sleeping.", e);
296        Thread.currentThread().interrupt();
297      }
298      snapshots = snapshotNotifier.copySnapshots();
299    }
300
301    SpaceQuotaSnapshot actualPolicyTN1 = snapshots.get(tn1);
302    assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
303    assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy().get());
304    SpaceQuotaSnapshot actualPolicyTN2 = snapshots.get(tn2);
305    assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
306    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
307
308    // Override the namespace quota with a table quota
309    final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
310    final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS;
311    QuotaSettings tableSettings =
312      QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit, tableViolationPolicy);
313    admin.setQuota(tableSettings);
314
315    // Keep checking for the table quota policy to override the namespace quota
316    while (true) {
317      snapshots = snapshotNotifier.copySnapshots();
318      SpaceQuotaSnapshot actualTableSnapshot = snapshots.get(tn1);
319      assertNotNull("Violation policy should never be null", actualTableSnapshot);
320      if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy().orElse(null)) {
321        LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
322        try {
323          Thread.sleep(DEFAULT_WAIT_MILLIS);
324        } catch (InterruptedException e) {
325          LOG.debug("Interrupted while sleeping");
326          Thread.currentThread().interrupt();
327        }
328        continue;
329      }
330      assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy().get());
331      break;
332    }
333
334    // This should not change with the introduction of the table quota for tn1
335    actualPolicyTN2 = snapshots.get(tn2);
336    assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
337    assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy().get());
338  }
339
340  @Test
341  public void testGetAllTablesWithQuotas() throws Exception {
342    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
343    Set<TableName> tablesWithQuotas = new HashSet<>();
344    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
345    // Partition the tables with quotas by table and ns quota
346    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
347
348    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
349    assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
350    assertEquals("Found tables: " + tables, namespaceTablesWithQuotas,
351      tables.getNamespaceQuotaTables());
352  }
353
354  @Test
355  public void testRpcQuotaTablesAreFiltered() throws Exception {
356    final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
357    Set<TableName> tablesWithQuotas = new HashSet<>();
358    Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
359    // Partition the tables with quotas by table and ns quota
360    helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
361
362    TableName rpcQuotaTable = helper.createTable();
363    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.throttleTable(rpcQuotaTable,
364      ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
365
366    // The `rpcQuotaTable` should not be included in this Set
367    TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
368    assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
369    assertEquals("Found tables: " + tables, namespaceTablesWithQuotas,
370      tables.getNamespaceQuotaTables());
371  }
372
373  @Test
374  public void testFilterRegions() throws Exception {
375    Map<TableName, Integer> mockReportedRegions = new HashMap<>();
376    // Can't mock because of primitive int as a return type -- Mockito
377    // can only handle an Integer.
378    TablesWithQuotas tables =
379      new TablesWithQuotas(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()) {
380        @Override
381        int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) {
382          Integer i = mockReportedRegions.get(table);
383          if (i == null) {
384            return 0;
385          }
386          return i;
387        }
388      };
389
390    // Create the tables
391    TableName tn1 = helper.createTableWithRegions(20);
392    TableName tn2 = helper.createTableWithRegions(20);
393    TableName tn3 = helper.createTableWithRegions(20);
394
395    // Add them to the Tables with Quotas object
396    tables.addTableQuotaTable(tn1);
397    tables.addTableQuotaTable(tn2);
398    tables.addTableQuotaTable(tn3);
399
400    // Mock the number of regions reported
401    mockReportedRegions.put(tn1, 10); // 50%
402    mockReportedRegions.put(tn2, 19); // 95%
403    mockReportedRegions.put(tn3, 20); // 100%
404
405    // Argument is un-used
406    tables.filterInsufficientlyReportedTables(null);
407    // The default of 95% reported should prevent tn1 from appearing
408    assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables());
409  }
410
411  @Test
412  public void testFetchSpaceQuota() throws Exception {
413    Multimap<TableName, QuotaSettings> tables = helper.createTablesWithSpaceQuotas();
414    // Can pass in an empty map, we're not consulting it.
415    chore.initializeSnapshotStores(Collections.emptyMap());
416    // All tables that were created should have a quota defined.
417    for (Entry<TableName, QuotaSettings> entry : tables.entries()) {
418      final TableName table = entry.getKey();
419      final QuotaSettings qs = entry.getValue();
420
421      assertTrue("QuotaSettings was an instance of " + qs.getClass(),
422        qs instanceof SpaceLimitSettings);
423
424      SpaceQuota spaceQuota = null;
425      if (qs.getTableName() != null) {
426        spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table);
427        assertNotNull("Could not find table space quota for " + table, spaceQuota);
428      } else if (qs.getNamespace() != null) {
429        spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString());
430        assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(),
431          spaceQuota);
432      } else {
433        fail("Expected table or namespace space quota");
434      }
435
436      final SpaceLimitSettings sls = (SpaceLimitSettings) qs;
437      assertEquals(sls.getProto().getQuota(), spaceQuota);
438    }
439
440    TableName tableWithoutQuota = helper.createTable();
441    assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
442  }
443
444  private int numSnapshotsInViolation(Map<TableName, SpaceQuotaSnapshot> snapshots) {
445    int sum = 0;
446    for (SpaceQuotaSnapshot snapshot : snapshots.values()) {
447      if (snapshot.getQuotaStatus().isInViolation()) {
448        sum++;
449      }
450    }
451    return sum;
452  }
453
454  private void sleepWithInterrupt(long millis) {
455    try {
456      Thread.sleep(millis);
457    } catch (InterruptedException e) {
458      LOG.debug("Interrupted while sleeping");
459      Thread.currentThread().interrupt();
460    }
461  }
462}