001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.testclassification.RegionServerTests;
027import org.apache.hadoop.hbase.testclassification.SmallTests;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
030import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
031import org.junit.ClassRule;
032import org.junit.Test;
033import org.junit.experimental.categories.Category;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
037
038@Category({ RegionServerTests.class, SmallTests.class })
039public class TestDefaultOperationQuota {
040  @ClassRule
041  public static final HBaseClassTestRule CLASS_RULE =
042    HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
043
044  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
045  static {
046    envEdge.setValue(EnvironmentEdgeManager.currentTime());
047    // only active the envEdge for quotas package
048    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
049      ThrottleQuotaTestUtil.class.getPackage().getName());
050  }
051
052  @Test
053  public void testScanEstimateNewScanner() {
054    long blockSize = 64 * 1024;
055    long nextCallSeq = 0;
056    long maxScannerResultSize = 100 * 1024 * 1024;
057    long maxBlockBytesScanned = 0;
058    long prevBBSDifference = 0;
059    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
060      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
061
062    // new scanner should estimate scan read as 1 block
063    assertEquals(blockSize, estimate);
064  }
065
066  @Test
067  public void testScanEstimateSecondNextCall() {
068    long blockSize = 64 * 1024;
069    long nextCallSeq = 1;
070    long maxScannerResultSize = 100 * 1024 * 1024;
071    long maxBlockBytesScanned = 10 * blockSize;
072    long prevBBSDifference = 10 * blockSize;
073    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
074      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
075
076    // 2nd next call should be estimated at maxBBS
077    assertEquals(maxBlockBytesScanned, estimate);
078  }
079
080  @Test
081  public void testScanEstimateFlatWorkload() {
082    long blockSize = 64 * 1024;
083    long nextCallSeq = 100;
084    long maxScannerResultSize = 100 * 1024 * 1024;
085    long maxBlockBytesScanned = 10 * blockSize;
086    long prevBBSDifference = 0;
087    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
088      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
089
090    // flat workload should not overestimate
091    assertEquals(maxBlockBytesScanned, estimate);
092  }
093
094  @Test
095  public void testScanEstimateVariableFlatWorkload() {
096    long blockSize = 64 * 1024;
097    long nextCallSeq = 1;
098    long maxScannerResultSize = 100 * 1024 * 1024;
099    long maxBlockBytesScanned = 10 * blockSize;
100    long prevBBSDifference = 0;
101    for (int i = 0; i < 100; i++) {
102      long variation = Math.round(Math.random() * blockSize);
103      if (variation % 2 == 0) {
104        variation *= -1;
105      }
106      // despite +/- <1 block variation, we consider this workload flat
107      prevBBSDifference = variation;
108
109      long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i,
110        maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
111
112      // flat workload should not overestimate
113      assertEquals(maxBlockBytesScanned, estimate);
114    }
115  }
116
117  @Test
118  public void testScanEstimateGrowingWorkload() {
119    long blockSize = 64 * 1024;
120    long nextCallSeq = 100;
121    long maxScannerResultSize = 100 * 1024 * 1024;
122    long maxBlockBytesScanned = 20 * blockSize;
123    long prevBBSDifference = 10 * blockSize;
124    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
125      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
126
127    // growing workload should overestimate
128    assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate);
129  }
130
131  @Test
132  public void testScanEstimateShrinkingWorkload() {
133    long blockSize = 64 * 1024;
134    long nextCallSeq = 100;
135    long maxScannerResultSize = 100 * 1024 * 1024;
136    long maxBlockBytesScanned = 20 * blockSize;
137    long prevBBSDifference = -10 * blockSize;
138    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
139      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
140
141    // shrinking workload should only shrink estimate to maxBBS
142    assertEquals(maxBlockBytesScanned, estimate);
143  }
144
145  @Test
146  public void testLargeBatchSaturatesReadNumLimit()
147    throws RpcThrottlingException, InterruptedException {
148    int limit = 10;
149    QuotaProtos.Throttle throttle =
150      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
151        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
152    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
153    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
154
155    // use the whole limit
156    quota.checkBatchQuota(0, limit);
157
158    // the next request should be rejected
159    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
160
161    envEdge.incValue(1000);
162    // after the TimeUnit, the limit should be refilled
163    quota.checkBatchQuota(0, limit);
164  }
165
166  @Test
167  public void testLargeBatchSaturatesReadWriteLimit()
168    throws RpcThrottlingException, InterruptedException {
169    int limit = 10;
170    QuotaProtos.Throttle throttle =
171      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
172        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
173    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
174    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
175
176    // use the whole limit
177    quota.checkBatchQuota(limit, 0);
178
179    // the next request should be rejected
180    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));
181
182    envEdge.incValue(1000);
183    // after the TimeUnit, the limit should be refilled
184    quota.checkBatchQuota(limit, 0);
185  }
186
187  @Test
188  public void testTooLargeReadBatchIsNotBlocked()
189    throws RpcThrottlingException, InterruptedException {
190    int limit = 10;
191    QuotaProtos.Throttle throttle =
192      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
193        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
194    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
195    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
196
197    // use more than the limit, which should succeed rather than being indefinitely blocked
198    quota.checkBatchQuota(0, 10 + limit);
199
200    // the next request should be blocked
201    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
202
203    envEdge.incValue(1000);
204    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
205    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit));
206  }
207
208  @Test
209  public void testTooLargeWriteBatchIsNotBlocked()
210    throws RpcThrottlingException, InterruptedException {
211    int limit = 10;
212    QuotaProtos.Throttle throttle =
213      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
214        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
215    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
216    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
217
218    // use more than the limit, which should succeed rather than being indefinitely blocked
219    quota.checkBatchQuota(10 + limit, 0);
220
221    // the next request should be blocked
222    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));
223
224    envEdge.incValue(1000);
225    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
226    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
227  }
228
229  @Test
230  public void testTooLargeWriteSizeIsNotBlocked()
231    throws RpcThrottlingException, InterruptedException {
232    int limit = 50;
233    QuotaProtos.Throttle throttle =
234      QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
235        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
236    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
237    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter);
238
239    // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
240    quota.checkBatchQuota(1, 0);
241
242    // the next request should be blocked
243    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0));
244
245    envEdge.incValue(1000);
246    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
247    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0));
248  }
249
250  @Test
251  public void testTooLargeReadSizeIsNotBlocked()
252    throws RpcThrottlingException, InterruptedException {
253    long blockSize = 65536;
254    long limit = blockSize / 2;
255    QuotaProtos.Throttle throttle =
256      QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
257        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
258    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
259    DefaultOperationQuota quota =
260      new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
261
262    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
263    quota.checkBatchQuota(0, 1);
264
265    // the next request should be blocked
266    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
267
268    envEdge.incValue(1000);
269    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
270    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
271  }
272
273  @Test
274  public void testTooLargeRequestSizeIsNotBlocked()
275    throws RpcThrottlingException, InterruptedException {
276    long blockSize = 65536;
277    long limit = blockSize / 2;
278    QuotaProtos.Throttle throttle =
279      QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
280        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
281    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
282    DefaultOperationQuota quota =
283      new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
284
285    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
286    quota.checkBatchQuota(0, 1);
287
288    // the next request should be blocked
289    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1));
290
291    envEdge.incValue(1000);
292    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
293    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1));
294  }
295}