001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.concurrent.atomic.AtomicLong;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.DoNotRetryIOException;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.Append;
040import org.apache.hadoop.hbase.client.ClientServiceCallable;
041import org.apache.hadoop.hbase.client.ClusterConnection;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.RpcRetryingCaller;
051import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
052import org.apache.hadoop.hbase.client.Scan;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.master.HMaster;
055import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement;
056import org.apache.hadoop.hbase.regionserver.HRegionServer;
057import org.apache.hadoop.hbase.security.AccessDeniedException;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.util.StringUtils;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * End-to-end test class for filesystem space quotas.
074 */
075@Category(LargeTests.class)
076public class TestSpaceQuotas {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestSpaceQuotas.class);
081
082  private static final Logger LOG = LoggerFactory.getLogger(TestSpaceQuotas.class);
083  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084  // Global for all tests in the class
085  private static final AtomicLong COUNTER = new AtomicLong(0);
086  private static final int NUM_RETRIES = 10;
087
088  @Rule
089  public TestName testName = new TestName();
090  private SpaceQuotaHelperForTests helper;
091
092  @BeforeClass
093  public static void setUp() throws Exception {
094    Configuration conf = TEST_UTIL.getConfiguration();
095    SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
096    TEST_UTIL.startMiniCluster(1);
097  }
098
099  @AfterClass
100  public static void tearDown() throws Exception {
101    TEST_UTIL.shutdownMiniCluster();
102  }
103
104  @Before
105  public void removeAllQuotas() throws Exception {
106    final Connection conn = TEST_UTIL.getConnection();
107    if (helper == null) {
108      helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
109    }
110    // Wait for the quota table to be created
111    if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
112      helper.waitForQuotaTable(conn);
113    } else {
114      // Or, clean up any quotas from previous test runs.
115      helper.removeAllQuotas(conn);
116      assertEquals(0, helper.listNumDefinedQuotas(conn));
117    }
118  }
119
120  @Test
121  public void testNoInsertsWithPut() throws Exception {
122    Put p = new Put(Bytes.toBytes("to_reject"));
123    p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
124      Bytes.toBytes("reject"));
125    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, p);
126  }
127
128  @Test
129  public void testNoInsertsWithAppend() throws Exception {
130    Append a = new Append(Bytes.toBytes("to_reject"));
131    a.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
132      Bytes.toBytes("reject"));
133    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, a);
134  }
135
136  @Test
137  public void testNoInsertsWithIncrement() throws Exception {
138    Increment i = new Increment(Bytes.toBytes("to_reject"));
139    i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0);
140    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_INSERTS, i);
141  }
142
143  @Test
144  public void testDeletesAfterNoInserts() throws Exception {
145    final TableName tn = writeUntilViolation(SpaceViolationPolicy.NO_INSERTS);
146    // Try a couple of times to verify that the quota never gets enforced, same as we
147    // do when we're trying to catch the failure.
148    Delete d = new Delete(Bytes.toBytes("should_not_be_rejected"));
149    for (int i = 0; i < NUM_RETRIES; i++) {
150      try (Table t = TEST_UTIL.getConnection().getTable(tn)) {
151        t.delete(d);
152      }
153    }
154  }
155
156  @Test
157  public void testNoWritesWithPut() throws Exception {
158    Put p = new Put(Bytes.toBytes("to_reject"));
159    p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
160      Bytes.toBytes("reject"));
161    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
162  }
163
164  @Test
165  public void testNoWritesWithAppend() throws Exception {
166    Append a = new Append(Bytes.toBytes("to_reject"));
167    a.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
168      Bytes.toBytes("reject"));
169    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, a);
170  }
171
172  @Test
173  public void testNoWritesWithIncrement() throws Exception {
174    Increment i = new Increment(Bytes.toBytes("to_reject"));
175    i.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("count"), 0);
176    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, i);
177  }
178
179  @Test
180  public void testNoWritesWithDelete() throws Exception {
181    Delete d = new Delete(Bytes.toBytes("to_reject"));
182    writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, d);
183  }
184
185  @Test
186  public void testNoCompactions() throws Exception {
187    Put p = new Put(Bytes.toBytes("to_reject"));
188    p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
189      Bytes.toBytes("reject"));
190    final TableName tn =
191      writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES_COMPACTIONS, p);
192    // We know the policy is active at this point
193
194    // Major compactions should be rejected
195    try {
196      TEST_UTIL.getAdmin().majorCompact(tn);
197      fail("Expected that invoking the compaction should throw an Exception");
198    } catch (DoNotRetryIOException e) {
199      // Expected!
200    }
201    // Minor compactions should also be rejected.
202    try {
203      TEST_UTIL.getAdmin().compact(tn);
204      fail("Expected that invoking the compaction should throw an Exception");
205    } catch (DoNotRetryIOException e) {
206      // Expected!
207    }
208  }
209
210  @Test
211  public void testNoEnableAfterDisablePolicy() throws Exception {
212    Put p = new Put(Bytes.toBytes("to_reject"));
213    p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
214      Bytes.toBytes("reject"));
215    final TableName tn = writeUntilViolation(SpaceViolationPolicy.DISABLE);
216    final Admin admin = TEST_UTIL.getAdmin();
217    // Disabling a table relies on some external action (over the other policies), so wait a bit
218    // more than the other tests.
219    for (int i = 0; i < NUM_RETRIES * 2; i++) {
220      if (admin.isTableEnabled(tn)) {
221        LOG.info(tn + " is still enabled, expecting it to be disabled. Will wait and re-check.");
222        Thread.sleep(2000);
223      }
224    }
225    assertFalse(tn + " is still enabled but it should be disabled", admin.isTableEnabled(tn));
226    try {
227      admin.enableTable(tn);
228    } catch (AccessDeniedException e) {
229      String exceptionContents = StringUtils.stringifyException(e);
230      final String expectedText = "violated space quota";
231      assertTrue(
232        "Expected the exception to contain " + expectedText + ", but was: " + exceptionContents,
233        exceptionContents.contains(expectedText));
234    }
235  }
236
237  @Test
238  public void testNoBulkLoadsWithNoWrites() throws Exception {
239    Put p = new Put(Bytes.toBytes("to_reject"));
240    p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
241      Bytes.toBytes("reject"));
242    TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
243
244    // The table is now in violation. Try to do a bulk load
245    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tableName, 1, 50);
246    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
247    RpcRetryingCallerFactory factory =
248      new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
249    RpcRetryingCaller<Void> caller = factory.newCaller();
250    try {
251      caller.callWithRetries(callable, Integer.MAX_VALUE);
252      fail("Expected the bulk load call to fail!");
253    } catch (SpaceLimitingException e) {
254      // Pass
255      LOG.trace("Caught expected exception", e);
256    }
257  }
258
259  @Test
260  public void testAtomicBulkLoadUnderQuota() throws Exception {
261    // Need to verify that if the batch of hfiles cannot be loaded, none are loaded.
262    TableName tn = helper.createTableWithRegions(10);
263
264    final long sizeLimit = 50L * SpaceQuotaHelperForTests.ONE_KILOBYTE;
265    QuotaSettings settings =
266      QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, SpaceViolationPolicy.NO_INSERTS);
267    TEST_UTIL.getAdmin().setQuota(settings);
268
269    HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
270    RegionServerSpaceQuotaManager spaceQuotaManager = rs.getRegionServerSpaceQuotaManager();
271    Map<TableName, SpaceQuotaSnapshot> snapshots = spaceQuotaManager.copyQuotaSnapshots();
272    Map<RegionInfo, Long> regionSizes = getReportedSizesForTable(tn);
273    while (true) {
274      SpaceQuotaSnapshot snapshot = snapshots.get(tn);
275      if (snapshot != null && snapshot.getLimit() > 0) {
276        break;
277      }
278      LOG.debug("Snapshot does not yet realize quota limit: " + snapshots + ", regionsizes: "
279        + regionSizes);
280      Thread.sleep(3000);
281      snapshots = spaceQuotaManager.copyQuotaSnapshots();
282      regionSizes = getReportedSizesForTable(tn);
283    }
284    // Our quota limit should be reflected in the latest snapshot
285    SpaceQuotaSnapshot snapshot = snapshots.get(tn);
286    assertEquals(0L, snapshot.getUsage());
287    assertEquals(sizeLimit, snapshot.getLimit());
288
289    // We would also not have a "real" policy in violation
290    ActivePolicyEnforcement activePolicies = spaceQuotaManager.getActiveEnforcements();
291    SpaceViolationPolicyEnforcement enforcement = activePolicies.getPolicyEnforcement(tn);
292    assertTrue("Expected to find Noop policy, but got " + enforcement.getClass().getSimpleName(),
293      enforcement instanceof DefaultViolationPolicyEnforcement);
294
295    // Should generate two files, each of which is over 25KB each
296    ClientServiceCallable<Void> callable = helper.generateFileToLoad(tn, 2, 525);
297    FileSystem fs = TEST_UTIL.getTestFileSystem();
298    FileStatus[] files =
299      fs.listStatus(new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
300    for (FileStatus file : files) {
301      assertTrue("Expected the file, " + file.getPath()
302        + ",  length to be larger than 25KB, but was " + file.getLen(),
303        file.getLen() > 25 * SpaceQuotaHelperForTests.ONE_KILOBYTE);
304      LOG.debug(file.getPath() + " -> " + file.getLen() + "B");
305    }
306
307    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
308    RpcRetryingCallerFactory factory =
309      new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration(), conn.getConnectionConfiguration());
310    RpcRetryingCaller<Void> caller = factory.newCaller();
311    try {
312      caller.callWithRetries(callable, Integer.MAX_VALUE);
313      fail("Expected the bulk load call to fail!");
314    } catch (SpaceLimitingException e) {
315      // Pass
316      LOG.trace("Caught expected exception", e);
317    }
318    // Verify that we have no data in the table because neither file should have been
319    // loaded even though one of the files could have.
320    Table table = TEST_UTIL.getConnection().getTable(tn);
321    ResultScanner scanner = table.getScanner(new Scan());
322    try {
323      assertNull("Expected no results", scanner.next());
324    } finally {
325      scanner.close();
326    }
327  }
328
329  @Test
330  public void testTableQuotaOverridesNamespaceQuota() throws Exception {
331    final SpaceViolationPolicy policy = SpaceViolationPolicy.NO_INSERTS;
332    final TableName tn = helper.createTableWithRegions(10);
333
334    // 2MB limit on the table, 1GB limit on the namespace
335    final long tableLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
336    final long namespaceLimit = 1024L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
337    TEST_UTIL.getAdmin().setQuota(QuotaSettingsFactory.limitTableSpace(tn, tableLimit, policy));
338    TEST_UTIL.getAdmin().setQuota(
339      QuotaSettingsFactory.limitNamespaceSpace(tn.getNamespaceAsString(), namespaceLimit, policy));
340
341    // Write more data than should be allowed and flush it to disk
342    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
343
344    // This should be sufficient time for the chores to run and see the change.
345    Thread.sleep(5000);
346
347    // The write should be rejected because the table quota takes priority over the namespace
348    Put p = new Put(Bytes.toBytes("to_reject"));
349    p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
350      Bytes.toBytes("reject"));
351    verifyViolation(policy, tn, p);
352  }
353
354  private Map<RegionInfo, Long> getReportedSizesForTable(TableName tn) {
355    HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
356    MasterQuotaManager quotaManager = master.getMasterQuotaManager();
357    Map<RegionInfo, Long> filteredRegionSizes = new HashMap<>();
358    for (Entry<RegionInfo, Long> entry : quotaManager.snapshotRegionSizes().entrySet()) {
359      if (entry.getKey().getTable().equals(tn)) {
360        filteredRegionSizes.put(entry.getKey(), entry.getValue());
361      }
362    }
363    return filteredRegionSizes;
364  }
365
366  private TableName writeUntilViolation(SpaceViolationPolicy policyToViolate) throws Exception {
367    TableName tn = helper.createTableWithRegions(10);
368
369    final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
370    QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, policyToViolate);
371    TEST_UTIL.getAdmin().setQuota(settings);
372
373    // Write more data than should be allowed and flush it to disk
374    helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
375
376    // This should be sufficient time for the chores to run and see the change.
377    Thread.sleep(5000);
378
379    return tn;
380  }
381
382  private TableName writeUntilViolationAndVerifyViolation(SpaceViolationPolicy policyToViolate,
383    Mutation m) throws Exception {
384    final TableName tn = writeUntilViolation(policyToViolate);
385    verifyViolation(policyToViolate, tn, m);
386    return tn;
387  }
388
389  private void verifyViolation(SpaceViolationPolicy policyToViolate, TableName tn, Mutation m)
390    throws Exception {
391    // But let's try a few times to get the exception before failing
392    boolean sawError = false;
393    for (int i = 0; i < NUM_RETRIES && !sawError; i++) {
394      try (Table table = TEST_UTIL.getConnection().getTable(tn)) {
395        if (m instanceof Put) {
396          table.put((Put) m);
397        } else if (m instanceof Delete) {
398          table.delete((Delete) m);
399        } else if (m instanceof Append) {
400          table.append((Append) m);
401        } else if (m instanceof Increment) {
402          table.increment((Increment) m);
403        } else {
404          fail(
405            "Failed to apply " + m.getClass().getSimpleName() + " to the table. Programming error");
406        }
407        LOG.info("Did not reject the " + m.getClass().getSimpleName() + ", will sleep and retry");
408        Thread.sleep(2000);
409      } catch (Exception e) {
410        String msg = StringUtils.stringifyException(e);
411        assertTrue("Expected exception message to contain the word '" + policyToViolate.name()
412          + "', but was " + msg, msg.contains(policyToViolate.name()));
413        sawError = true;
414      }
415    }
416    if (!sawError) {
417      try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
418        ResultScanner scanner = quotaTable.getScanner(new Scan());
419        Result result = null;
420        LOG.info("Dumping contents of hbase:quota table");
421        while ((result = scanner.next()) != null) {
422          LOG.info(Bytes.toString(result.getRow()) + " => " + result.toString());
423        }
424        scanner.close();
425      }
426    }
427    assertTrue("Expected to see an exception writing data to a table exceeding its quota",
428      sawError);
429  }
430}