001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.testclassification.RegionServerTests; 027import org.apache.hadoop.hbase.testclassification.SmallTests; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 030import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 031import org.junit.ClassRule; 032import org.junit.Test; 033import org.junit.experimental.categories.Category; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 037 038@Category({ RegionServerTests.class, SmallTests.class }) 039public class TestDefaultOperationQuota { 040 @ClassRule 041 public static final HBaseClassTestRule CLASS_RULE = 042 HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); 043 044 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 045 static { 046 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 047 // only active the envEdge for quotas package 048 EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, 049 ThrottleQuotaTestUtil.class.getPackage().getName()); 050 } 051 052 @Test 053 public void testScanEstimateNewScanner() { 054 long blockSize = 64 * 1024; 055 long nextCallSeq = 0; 056 long maxScannerResultSize = 100 * 1024 * 1024; 057 long maxBlockBytesScanned = 0; 058 long prevBBSDifference = 0; 059 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 060 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 061 062 // new scanner should estimate scan read as 1 block 063 assertEquals(blockSize, estimate); 064 } 065 066 @Test 067 public void testScanEstimateSecondNextCall() { 068 long blockSize = 64 * 1024; 069 long nextCallSeq = 1; 070 long maxScannerResultSize = 100 * 1024 * 1024; 071 long maxBlockBytesScanned = 10 * blockSize; 072 long prevBBSDifference = 10 * blockSize; 073 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 074 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 075 076 // 2nd next call should be estimated at maxBBS 077 assertEquals(maxBlockBytesScanned, estimate); 078 } 079 080 @Test 081 public void testScanEstimateFlatWorkload() { 082 long blockSize = 64 * 1024; 083 long nextCallSeq = 100; 084 long maxScannerResultSize = 100 * 1024 * 1024; 085 long maxBlockBytesScanned = 10 * blockSize; 086 long prevBBSDifference = 0; 087 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 088 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 089 090 // flat workload should not overestimate 091 assertEquals(maxBlockBytesScanned, estimate); 092 } 093 094 @Test 095 public void testScanEstimateVariableFlatWorkload() { 096 long blockSize = 64 * 1024; 097 long nextCallSeq = 1; 098 long maxScannerResultSize = 100 * 1024 * 1024; 099 long maxBlockBytesScanned = 10 * blockSize; 100 long prevBBSDifference = 0; 101 for (int i = 0; i < 100; i++) { 102 long variation = Math.round(Math.random() * blockSize); 103 if (variation % 2 == 0) { 104 variation *= -1; 105 } 106 // despite +/- <1 block variation, we consider this workload flat 107 prevBBSDifference = variation; 108 109 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i, 110 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 111 112 // flat workload should not overestimate 113 assertEquals(maxBlockBytesScanned, estimate); 114 } 115 } 116 117 @Test 118 public void testScanEstimateGrowingWorkload() { 119 long blockSize = 64 * 1024; 120 long nextCallSeq = 100; 121 long maxScannerResultSize = 100 * 1024 * 1024; 122 long maxBlockBytesScanned = 20 * blockSize; 123 long prevBBSDifference = 10 * blockSize; 124 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 125 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 126 127 // growing workload should overestimate 128 assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate); 129 } 130 131 @Test 132 public void testScanEstimateShrinkingWorkload() { 133 long blockSize = 64 * 1024; 134 long nextCallSeq = 100; 135 long maxScannerResultSize = 100 * 1024 * 1024; 136 long maxBlockBytesScanned = 20 * blockSize; 137 long prevBBSDifference = -10 * blockSize; 138 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 139 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 140 141 // shrinking workload should only shrink estimate to maxBBS 142 assertEquals(maxBlockBytesScanned, estimate); 143 } 144 145 @Test 146 public void testLargeBatchSaturatesReadNumLimit() 147 throws RpcThrottlingException, InterruptedException { 148 int limit = 10; 149 QuotaProtos.Throttle throttle = 150 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 151 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 152 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 153 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); 154 155 // use the whole limit 156 quota.checkBatchQuota(0, limit); 157 158 // the next request should be rejected 159 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); 160 161 envEdge.incValue(1000); 162 // after the TimeUnit, the limit should be refilled 163 quota.checkBatchQuota(0, limit); 164 } 165 166 @Test 167 public void testLargeBatchSaturatesReadWriteLimit() 168 throws RpcThrottlingException, InterruptedException { 169 int limit = 10; 170 QuotaProtos.Throttle throttle = 171 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 172 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 173 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 174 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); 175 176 // use the whole limit 177 quota.checkBatchQuota(limit, 0); 178 179 // the next request should be rejected 180 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); 181 182 envEdge.incValue(1000); 183 // after the TimeUnit, the limit should be refilled 184 quota.checkBatchQuota(limit, 0); 185 } 186 187 @Test 188 public void testTooLargeReadBatchIsNotBlocked() 189 throws RpcThrottlingException, InterruptedException { 190 int limit = 10; 191 QuotaProtos.Throttle throttle = 192 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 193 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 194 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 195 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); 196 197 // use more than the limit, which should succeed rather than being indefinitely blocked 198 quota.checkBatchQuota(0, 10 + limit); 199 200 // the next request should be blocked 201 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); 202 203 envEdge.incValue(1000); 204 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 205 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); 206 } 207 208 @Test 209 public void testTooLargeWriteBatchIsNotBlocked() 210 throws RpcThrottlingException, InterruptedException { 211 int limit = 10; 212 QuotaProtos.Throttle throttle = 213 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 214 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 215 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 216 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); 217 218 // use more than the limit, which should succeed rather than being indefinitely blocked 219 quota.checkBatchQuota(10 + limit, 0); 220 221 // the next request should be blocked 222 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); 223 224 envEdge.incValue(1000); 225 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 226 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); 227 } 228 229 @Test 230 public void testTooLargeWriteSizeIsNotBlocked() 231 throws RpcThrottlingException, InterruptedException { 232 int limit = 50; 233 QuotaProtos.Throttle throttle = 234 QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() 235 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 236 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 237 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); 238 239 // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked 240 quota.checkBatchQuota(1, 0); 241 242 // the next request should be blocked 243 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); 244 245 envEdge.incValue(1000); 246 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 247 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); 248 } 249 250 @Test 251 public void testTooLargeReadSizeIsNotBlocked() 252 throws RpcThrottlingException, InterruptedException { 253 long blockSize = 65536; 254 long limit = blockSize / 2; 255 QuotaProtos.Throttle throttle = 256 QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() 257 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 258 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 259 DefaultOperationQuota quota = 260 new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); 261 262 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 263 quota.checkBatchQuota(0, 1); 264 265 // the next request should be blocked 266 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); 267 268 envEdge.incValue(1000); 269 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 270 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); 271 } 272 273 @Test 274 public void testTooLargeRequestSizeIsNotBlocked() 275 throws RpcThrottlingException, InterruptedException { 276 long blockSize = 65536; 277 long limit = blockSize / 2; 278 QuotaProtos.Throttle throttle = 279 QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() 280 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 281 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 282 DefaultOperationQuota quota = 283 new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); 284 285 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 286 quota.checkBatchQuota(0, 1); 287 288 // the next request should be blocked 289 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); 290 291 envEdge.incValue(1000); 292 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 293 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); 294 } 295}