001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.CheckAndMutate; 031import org.apache.hadoop.hbase.client.Increment; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.RowMutations; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.security.User; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.testclassification.RegionServerTests; 038import org.apache.hadoop.hbase.util.Bytes; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048@Category({ RegionServerTests.class, MediumTests.class }) 049public class TestAtomicReadQuota { 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestAtomicReadQuota.class); 053 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class); 054 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 055 private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); 056 private static final byte[] FAMILY = Bytes.toBytes("cf"); 057 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 058 059 @AfterClass 060 public static void tearDown() throws Exception { 061 ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); 062 EnvironmentEdgeManager.reset(); 063 TEST_UTIL.deleteTable(TABLE_NAME); 064 TEST_UTIL.shutdownMiniCluster(); 065 } 066 067 @BeforeClass 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 070 TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); 071 TEST_UTIL.startMiniCluster(1); 072 TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); 073 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 074 TEST_UTIL.waitTableAvailable(TABLE_NAME); 075 QuotaCache.TEST_FORCE_REFRESH = true; 076 } 077 078 @Test 079 public void testIncrementCountedAgainstReadCapacity() throws Exception { 080 setupQuota(); 081 082 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 083 inc.addColumn(FAMILY, QUALIFIER, 1); 084 testThrottle(table -> table.increment(inc)); 085 } 086 087 @Test 088 public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { 089 setupQuota(); 090 091 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 092 Increment inc = new Increment(row); 093 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 094 Put put = new Put(row); 095 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 096 097 RowMutations rowMutations = new RowMutations(row); 098 rowMutations.add(inc); 099 rowMutations.add(put); 100 testThrottle(table -> table.mutateRow(rowMutations)); 101 } 102 103 @Test 104 public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { 105 setupQuota(); 106 107 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 108 Put put = new Put(row); 109 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 110 111 RowMutations rowMutations = new RowMutations(row); 112 rowMutations.add(put); 113 try (Table table = getTable()) { 114 for (int i = 0; i < 100; i++) { 115 table.mutateRow(rowMutations); 116 } 117 } 118 } 119 120 @Test 121 public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { 122 setupQuota(); 123 124 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 125 Put put = new Put(row); 126 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 127 try (Table table = getTable()) { 128 for (int i = 0; i < 100; i++) { 129 table.put(put); 130 } 131 } 132 } 133 134 @Test 135 public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { 136 setupQuota(); 137 138 Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 139 put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 140 Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 141 put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 142 143 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 144 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 145 146 List<Put> puts = new ArrayList<>(2); 147 puts.add(put1); 148 puts.add(put2); 149 150 try (Table table = getTable()) { 151 for (int i = 0; i < 100; i++) { 152 table.put(puts); 153 } 154 } 155 } 156 157 @Test 158 public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { 159 setupQuota(); 160 161 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 162 byte[] value = Bytes.toBytes("v"); 163 Put put = new Put(row); 164 put.addColumn(FAMILY, Bytes.toBytes("doot"), value); 165 CheckAndMutate checkAndMutate = 166 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put); 167 168 testThrottle(table -> table.checkAndMutate(checkAndMutate)); 169 } 170 171 @Test 172 public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { 173 setupQuota(); 174 175 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 176 Increment inc = new Increment(row); 177 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 178 179 List<Increment> incs = new ArrayList<>(2); 180 incs.add(inc); 181 incs.add(inc); 182 183 testThrottle(table -> { 184 Object[] results = new Object[] {}; 185 table.batch(incs, results); 186 return results; 187 }); 188 } 189 190 private void setupQuota() throws Exception { 191 try (Admin admin = TEST_UTIL.getAdmin()) { 192 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 193 ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); 194 } 195 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 196 } 197 198 private void cleanupQuota() throws Exception { 199 try (Admin admin = TEST_UTIL.getAdmin()) { 200 admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); 201 } 202 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 203 } 204 205 private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception { 206 try (Table table = getTable()) { 207 // we have a read quota configured, so this should fail 208 TEST_UTIL.waitFor(60_000, () -> { 209 try { 210 request.run(table); 211 return false; 212 } catch (Exception e) { 213 boolean success = e.getCause() instanceof RpcThrottlingException; 214 if (!success) { 215 LOG.error("Unexpected exception", e); 216 } 217 return success; 218 } 219 }); 220 } finally { 221 cleanupQuota(); 222 } 223 } 224 225 private Table getTable() throws IOException { 226 TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); 227 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 228 return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) 229 .build(); 230 } 231 232 @FunctionalInterface 233 private interface ThrowingFunction<I, O> { 234 O run(I input) throws Exception; 235 } 236 237}