001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseTestingUtil;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.Admin;
030import org.apache.hadoop.hbase.client.CheckAndMutate;
031import org.apache.hadoop.hbase.client.Get;
032import org.apache.hadoop.hbase.client.Increment;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.RowMutations;
035import org.apache.hadoop.hbase.client.Table;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.testclassification.MediumTests;
038import org.apache.hadoop.hbase.testclassification.RegionServerTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049@Category({ RegionServerTests.class, MediumTests.class })
050public class TestAtomicReadQuota {
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
054  private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
055  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
056  private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
057  private static final byte[] FAMILY = Bytes.toBytes("cf");
058  private static final byte[] QUALIFIER = Bytes.toBytes("q");
059
060  @AfterClass
061  public static void tearDown() throws Exception {
062    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
063    EnvironmentEdgeManager.reset();
064    TEST_UTIL.deleteTable(TABLE_NAME);
065    TEST_UTIL.shutdownMiniCluster();
066  }
067
068  @BeforeClass
069  public static void setUpBeforeClass() throws Exception {
070    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
071    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
072    TEST_UTIL.startMiniCluster(1);
073    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
074    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
075    TEST_UTIL.waitTableAvailable(TABLE_NAME);
076    QuotaCache.TEST_FORCE_REFRESH = true;
077  }
078
079  @Test
080  public void testIncrementCountedAgainstReadCapacity() throws Exception {
081    setupGenericQuota();
082
083    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
084    inc.addColumn(FAMILY, QUALIFIER, 1);
085    testThrottle(table -> table.increment(inc));
086  }
087
088  @Test
089  public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
090    setupGenericQuota();
091
092    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
093    Increment inc = new Increment(row);
094    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
095    Put put = new Put(row);
096    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
097
098    RowMutations rowMutations = new RowMutations(row);
099    rowMutations.add(inc);
100    rowMutations.add(put);
101    testThrottle(table -> table.mutateRow(rowMutations));
102  }
103
104  @Test
105  public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
106    setupGenericQuota();
107
108    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
109    Put put = new Put(row);
110    put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
111
112    RowMutations rowMutations = new RowMutations(row);
113    rowMutations.add(put);
114    try (Table table = getTable()) {
115      for (int i = 0; i < 100; i++) {
116        table.mutateRow(rowMutations);
117      }
118    }
119  }
120
121  @Test
122  public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
123    setupGenericQuota();
124    runNonAtomicPuts();
125  }
126
127  @Test
128  public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
129    setupGenericQuota();
130    runNonAtomicPuts();
131  }
132
133  @Test
134  public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
135    setupGenericQuota();
136
137    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
138    byte[] value = Bytes.toBytes("v");
139    Put put = new Put(row);
140    put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
141    CheckAndMutate checkAndMutate =
142      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);
143
144    testThrottle(table -> table.checkAndMutate(checkAndMutate));
145  }
146
147  @Test
148  public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
149    setupGenericQuota();
150
151    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
152    Increment inc = new Increment(row);
153    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
154
155    List<Increment> incs = new ArrayList<>(2);
156    incs.add(inc);
157    incs.add(inc);
158
159    testThrottle(table -> {
160      Object[] results = new Object[] {};
161      table.batch(incs, results);
162      return results;
163    });
164  }
165
166  @Test
167  public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception {
168    setupAtomicOnlyReqNumQuota();
169
170    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
171    Increment inc = new Increment(row);
172    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
173
174    List<Increment> incs = new ArrayList<>(2);
175    incs.add(inc);
176    incs.add(inc);
177
178    testThrottle(table -> {
179      Object[] results = new Object[] {};
180      table.batch(incs, results);
181      return results;
182    });
183  }
184
185  @Test
186  public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception {
187    setupAtomicOnlyReadSizeQuota();
188
189    byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
190    Increment inc = new Increment(row);
191    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
192
193    List<Increment> incs = new ArrayList<>(2);
194    incs.add(inc);
195    incs.add(inc);
196
197    testThrottle(table -> {
198      Object[] results = new Object[] {};
199      table.batch(incs, results);
200      return results;
201    });
202  }
203
204  @Test
205  public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception {
206    setupAtomicOnlyReqNumQuota();
207    runNonAtomicPuts();
208  }
209
210  @Test
211  public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception {
212    setupAtomicOnlyReadSizeQuota();
213    runNonAtomicPuts();
214  }
215
216  @Test
217  public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception {
218    setupAtomicOnlyReqNumQuota();
219    runNonAtomicReads();
220  }
221
222  @Test
223  public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception {
224    setupAtomicOnlyReadSizeQuota();
225    runNonAtomicReads();
226  }
227
228  private void runNonAtomicPuts() throws Exception {
229    Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
230    put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
231    Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
232    put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
233
234    Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
235    inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
236
237    List<Put> puts = new ArrayList<>(2);
238    puts.add(put1);
239    puts.add(put2);
240
241    try (Table table = getTable()) {
242      for (int i = 0; i < 100; i++) {
243        table.put(puts);
244      }
245    }
246  }
247
248  private void runNonAtomicReads() throws Exception {
249    try (Table table = getTable()) {
250      byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
251      Get get = new Get(row);
252      table.get(get);
253    }
254  }
255
256  private void setupGenericQuota() throws Exception {
257    try (Admin admin = TEST_UTIL.getAdmin()) {
258      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
259        ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
260    }
261    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
262  }
263
264  private void setupAtomicOnlyReqNumQuota() throws Exception {
265    try (Admin admin = TEST_UTIL.getAdmin()) {
266      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
267        ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES));
268    }
269    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
270  }
271
272  private void setupAtomicOnlyReadSizeQuota() throws Exception {
273    try (Admin admin = TEST_UTIL.getAdmin()) {
274      admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
275        ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES));
276    }
277    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
278  }
279
280  private void cleanupQuota() throws Exception {
281    try (Admin admin = TEST_UTIL.getAdmin()) {
282      admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
283    }
284    ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
285  }
286
287  private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
288    try (Table table = getTable()) {
289      // we have a read quota configured, so this should fail
290      TEST_UTIL.waitFor(60_000, () -> {
291        try {
292          request.run(table);
293          return false;
294        } catch (Exception e) {
295          boolean success = e.getCause() instanceof RpcThrottlingException;
296          if (!success) {
297            LOG.error("Unexpected exception", e);
298          }
299          return success;
300        }
301      });
302    } finally {
303      cleanupQuota();
304    }
305  }
306
307  private Table getTable() throws IOException {
308    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
309    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
310    return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
311      .build();
312  }
313
314  @FunctionalInterface
315  private interface ThrowingFunction<I, O> {
316    O run(I input) throws Exception;
317  }
318
319}