001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.CheckAndMutate; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.client.Increment; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.RowMutations; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ RegionServerTests.class, MediumTests.class }) 050public class TestAtomicReadQuota { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestAtomicReadQuota.class); 054 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class); 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); 057 private static final byte[] FAMILY = Bytes.toBytes("cf"); 058 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 059 060 @AfterClass 061 public static void tearDown() throws Exception { 062 ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); 063 EnvironmentEdgeManager.reset(); 064 TEST_UTIL.deleteTable(TABLE_NAME); 065 TEST_UTIL.shutdownMiniCluster(); 066 } 067 068 @BeforeClass 069 public static void setUpBeforeClass() throws Exception { 070 TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 071 TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); 072 TEST_UTIL.startMiniCluster(1); 073 TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); 074 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 075 TEST_UTIL.waitTableAvailable(TABLE_NAME); 076 QuotaCache.TEST_FORCE_REFRESH = true; 077 } 078 079 @Test 080 public void testIncrementCountedAgainstReadCapacity() throws Exception { 081 setupGenericQuota(); 082 083 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 084 inc.addColumn(FAMILY, QUALIFIER, 1); 085 testThrottle(table -> table.increment(inc)); 086 } 087 088 @Test 089 public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { 090 setupGenericQuota(); 091 092 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 093 Increment inc = new Increment(row); 094 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 095 Put put = new Put(row); 096 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 097 098 RowMutations rowMutations = new RowMutations(row); 099 rowMutations.add(inc); 100 rowMutations.add(put); 101 testThrottle(table -> table.mutateRow(rowMutations)); 102 } 103 104 @Test 105 public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { 106 setupGenericQuota(); 107 108 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 109 Put put = new Put(row); 110 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 111 112 RowMutations rowMutations = new RowMutations(row); 113 rowMutations.add(put); 114 try (Table table = getTable()) { 115 for (int i = 0; i < 100; i++) { 116 table.mutateRow(rowMutations); 117 } 118 } 119 } 120 121 @Test 122 public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { 123 setupGenericQuota(); 124 runNonAtomicPuts(); 125 } 126 127 @Test 128 public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { 129 setupGenericQuota(); 130 runNonAtomicPuts(); 131 } 132 133 @Test 134 public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { 135 setupGenericQuota(); 136 137 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 138 byte[] value = Bytes.toBytes("v"); 139 Put put = new Put(row); 140 put.addColumn(FAMILY, Bytes.toBytes("doot"), value); 141 CheckAndMutate checkAndMutate = 142 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put); 143 144 testThrottle(table -> table.checkAndMutate(checkAndMutate)); 145 } 146 147 @Test 148 public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { 149 setupGenericQuota(); 150 151 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 152 Increment inc = new Increment(row); 153 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 154 155 List<Increment> incs = new ArrayList<>(2); 156 incs.add(inc); 157 incs.add(inc); 158 159 testThrottle(table -> { 160 Object[] results = new Object[] {}; 161 table.batch(incs, results); 162 return results; 163 }); 164 } 165 166 @Test 167 public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception { 168 setupAtomicOnlyReqNumQuota(); 169 170 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 171 Increment inc = new Increment(row); 172 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 173 174 List<Increment> incs = new ArrayList<>(2); 175 incs.add(inc); 176 incs.add(inc); 177 178 testThrottle(table -> { 179 Object[] results = new Object[] {}; 180 table.batch(incs, results); 181 return results; 182 }); 183 } 184 185 @Test 186 public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception { 187 setupAtomicOnlyReadSizeQuota(); 188 189 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 190 Increment inc = new Increment(row); 191 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 192 193 List<Increment> incs = new ArrayList<>(2); 194 incs.add(inc); 195 incs.add(inc); 196 197 testThrottle(table -> { 198 Object[] results = new Object[] {}; 199 table.batch(incs, results); 200 return results; 201 }); 202 } 203 204 @Test 205 public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception { 206 setupAtomicOnlyReqNumQuota(); 207 runNonAtomicPuts(); 208 } 209 210 @Test 211 public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception { 212 setupAtomicOnlyReadSizeQuota(); 213 runNonAtomicPuts(); 214 } 215 216 @Test 217 public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception { 218 setupAtomicOnlyReqNumQuota(); 219 runNonAtomicReads(); 220 } 221 222 @Test 223 public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception { 224 setupAtomicOnlyReadSizeQuota(); 225 runNonAtomicReads(); 226 } 227 228 private void runNonAtomicPuts() throws Exception { 229 Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 230 put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 231 Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 232 put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 233 234 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 235 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 236 237 List<Put> puts = new ArrayList<>(2); 238 puts.add(put1); 239 puts.add(put2); 240 241 try (Table table = getTable()) { 242 for (int i = 0; i < 100; i++) { 243 table.put(puts); 244 } 245 } 246 } 247 248 private void runNonAtomicReads() throws Exception { 249 try (Table table = getTable()) { 250 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 251 Get get = new Get(row); 252 table.get(get); 253 } 254 } 255 256 private void setupGenericQuota() throws Exception { 257 try (Admin admin = TEST_UTIL.getAdmin()) { 258 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 259 ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); 260 } 261 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 262 } 263 264 private void setupAtomicOnlyReqNumQuota() throws Exception { 265 try (Admin admin = TEST_UTIL.getAdmin()) { 266 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 267 ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES)); 268 } 269 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 270 } 271 272 private void setupAtomicOnlyReadSizeQuota() throws Exception { 273 try (Admin admin = TEST_UTIL.getAdmin()) { 274 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 275 ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES)); 276 } 277 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 278 } 279 280 private void cleanupQuota() throws Exception { 281 try (Admin admin = TEST_UTIL.getAdmin()) { 282 admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); 283 } 284 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 285 } 286 287 private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception { 288 try (Table table = getTable()) { 289 // we have a read quota configured, so this should fail 290 TEST_UTIL.waitFor(60_000, () -> { 291 try { 292 request.run(table); 293 return false; 294 } catch (Exception e) { 295 boolean success = e.getCause() instanceof RpcThrottlingException; 296 if (!success) { 297 LOG.error("Unexpected exception", e); 298 } 299 return success; 300 } 301 }); 302 } finally { 303 cleanupQuota(); 304 } 305 } 306 307 private Table getTable() throws IOException { 308 TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); 309 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 310 return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) 311 .build(); 312 } 313 314 @FunctionalInterface 315 private interface ThrowingFunction<I, O> { 316 O run(I input) throws Exception; 317 } 318 319}