001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtility; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.Admin; 030import org.apache.hadoop.hbase.client.CheckAndMutate; 031import org.apache.hadoop.hbase.client.Increment; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 034import org.apache.hadoop.hbase.client.RowMutations; 035import org.apache.hadoop.hbase.client.Table; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@Category({ RegionServerTests.class, MediumTests.class }) 050public class TestAtomicReadQuota { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestAtomicReadQuota.class); 054 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class); 055 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 056 private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); 057 private static final byte[] FAMILY = Bytes.toBytes("cf"); 058 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 059 060 @AfterClass 061 public static void tearDown() throws Exception { 062 ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); 063 EnvironmentEdgeManager.reset(); 064 TEST_UTIL.deleteTable(TABLE_NAME); 065 TEST_UTIL.shutdownMiniCluster(); 066 } 067 068 @BeforeClass 069 public static void setUpBeforeClass() throws Exception { 070 TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1); 071 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 072 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PAUSE, 1); 073 TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); 074 TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000); 075 TEST_UTIL.startMiniCluster(1); 076 TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); 077 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 078 TEST_UTIL.waitTableAvailable(TABLE_NAME); 079 QuotaCache.TEST_FORCE_REFRESH = true; 080 } 081 082 @Test 083 public void testIncrementCountedAgainstReadCapacity() throws Exception { 084 setupQuota(); 085 086 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 087 inc.addColumn(FAMILY, QUALIFIER, 1); 088 testThrottle(table -> table.increment(inc)); 089 } 090 091 @Test 092 public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { 093 setupQuota(); 094 095 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 096 Increment inc = new Increment(row); 097 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 098 Put put = new Put(row); 099 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 100 101 RowMutations rowMutations = new RowMutations(row); 102 rowMutations.add(inc); 103 rowMutations.add(put); 104 testThrottle(table -> table.mutateRow(rowMutations)); 105 } 106 107 @Test 108 public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { 109 setupQuota(); 110 111 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 112 Put put = new Put(row); 113 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 114 115 RowMutations rowMutations = new RowMutations(row); 116 rowMutations.add(put); 117 try (Table table = getTable()) { 118 for (int i = 0; i < 100; i++) { 119 table.mutateRow(rowMutations); 120 } 121 } 122 } 123 124 @Test 125 public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { 126 setupQuota(); 127 128 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 129 Put put = new Put(row); 130 put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 131 try (Table table = getTable()) { 132 for (int i = 0; i < 100; i++) { 133 table.put(put); 134 } 135 } 136 } 137 138 @Test 139 public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { 140 setupQuota(); 141 142 Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 143 put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 144 Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 145 put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); 146 147 Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); 148 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 149 150 List<Put> puts = new ArrayList<>(2); 151 puts.add(put1); 152 puts.add(put2); 153 154 try (Table table = getTable()) { 155 for (int i = 0; i < 100; i++) { 156 table.put(puts); 157 } 158 } 159 } 160 161 @Test 162 public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { 163 setupQuota(); 164 165 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 166 byte[] value = Bytes.toBytes("v"); 167 Put put = new Put(row); 168 put.addColumn(FAMILY, Bytes.toBytes("doot"), value); 169 CheckAndMutate checkAndMutate = 170 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put); 171 172 testThrottle(table -> table.checkAndMutate(checkAndMutate)); 173 } 174 175 @Test 176 public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { 177 setupQuota(); 178 179 byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); 180 Increment inc = new Increment(row); 181 inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); 182 183 List<Increment> incs = new ArrayList<>(2); 184 incs.add(inc); 185 incs.add(inc); 186 187 testThrottle(table -> { 188 Object[] results = new Object[incs.size()]; 189 table.batch(incs, results); 190 return results; 191 }); 192 } 193 194 private void setupQuota() throws Exception { 195 try (Admin admin = TEST_UTIL.getAdmin()) { 196 admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), 197 ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); 198 } 199 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); 200 } 201 202 private void cleanupQuota() throws Exception { 203 try (Admin admin = TEST_UTIL.getAdmin()) { 204 admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); 205 } 206 ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); 207 } 208 209 private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception { 210 try (Table table = getTable()) { 211 // we have a read quota configured, so this should fail 212 TEST_UTIL.waitFor(60_000, () -> { 213 boolean success; 214 Exception ex; 215 try { 216 request.run(table); 217 return false; 218 } catch (RetriesExhaustedWithDetailsException e) { 219 success = e.getCauses().stream().allMatch(t -> t instanceof RpcThrottlingException 220 || t.getCause() instanceof RpcThrottlingException); 221 ex = e; 222 } catch (Exception e) { 223 success = e.getCause() instanceof RpcThrottlingException; 224 ex = e; 225 } 226 if (!success) { 227 LOG.error("Unexpected exception", ex); 228 } 229 return success; 230 }); 231 } finally { 232 cleanupQuota(); 233 } 234 } 235 236 private Table getTable() throws IOException { 237 return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) 238 .build(); 239 } 240 241 @FunctionalInterface 242 private interface ThrowingFunction<I, O> { 243 O run(I input) throws Exception; 244 } 245 246}