001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.util.Arrays;
021import java.util.List;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.Cell;
024import org.apache.hadoop.hbase.client.Mutation;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.ipc.RpcCall;
027import org.apache.hadoop.hbase.ipc.RpcServer;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030
031import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
032
033@InterfaceAudience.Private
034@InterfaceStability.Evolving
035public class DefaultOperationQuota implements OperationQuota {
036
037  // a single scan estimate can consume no more than this proportion of the limiter's limit
038  // this prevents a long-running scan from being estimated at, say, 100MB of IO against
039  // a <100MB/IO throttle (because this would never succeed)
040  private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9;
041
042  protected final List<QuotaLimiter> limiters;
043  private final long writeCapacityUnit;
044  private final long readCapacityUnit;
045
046  // the available read/write quota size in bytes
047  protected long readAvailable = 0;
048  // estimated quota
049  protected long writeConsumed = 0;
050  protected long readConsumed = 0;
051  protected long writeCapacityUnitConsumed = 0;
052  protected long readCapacityUnitConsumed = 0;
053  // real consumed quota
054  private final long[] operationSize;
055  // difference between estimated quota and real consumed quota used in close method
056  // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass
057  // of DefaultOperationQuota
058  protected long writeDiff = 0;
059  protected long readDiff = 0;
060  protected long writeCapacityUnitDiff = 0;
061  protected long readCapacityUnitDiff = 0;
062  private boolean useResultSizeBytes;
063  private long blockSizeBytes;
064  private long maxScanEstimate;
065  private boolean isAtomic = false;
066
067  public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
068    final QuotaLimiter... limiters) {
069    this(conf, Arrays.asList(limiters));
070    this.useResultSizeBytes =
071      conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
072    this.blockSizeBytes = blockSizeBytes;
073    long readSizeLimit =
074      Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
075    maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
076  }
077
078  /**
079   * NOTE: The order matters. It should be something like [user, table, namespace, global]
080   */
081  public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
082    this.writeCapacityUnit =
083      conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
084    this.readCapacityUnit =
085      conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
086    this.limiters = limiters;
087    int size = OperationType.values().length;
088    operationSize = new long[size];
089
090    for (int i = 0; i < size; ++i) {
091      operationSize[i] = 0;
092    }
093  }
094
095  @Override
096  public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic)
097    throws RpcThrottlingException {
098    updateEstimateConsumeBatchQuota(numWrites, numReads);
099    checkQuota(numWrites, numReads, isAtomic);
100  }
101
102  @Override
103  public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
104    long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
105    updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
106      prevBlockBytesScannedDifference);
107    checkQuota(0, 1, false);
108  }
109
110  private void checkQuota(long numWrites, long numReads, boolean isAtomic)
111    throws RpcThrottlingException {
112    if (isAtomic) {
113      // Remember this flag for later use in close()
114      this.isAtomic = true;
115    }
116    readAvailable = Long.MAX_VALUE;
117    for (final QuotaLimiter limiter : limiters) {
118      if (limiter.isBypass()) {
119        continue;
120      }
121
122      long maxRequestsToEstimate = limiter.getRequestNumLimit();
123      long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit());
124      long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit());
125      long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit());
126      long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit());
127
128      limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
129        Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads),
130        Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed,
131        readCapacityUnitConsumed, isAtomic);
132      readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
133    }
134
135    for (final QuotaLimiter limiter : limiters) {
136      limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
137        readCapacityUnitConsumed, isAtomic);
138    }
139  }
140
141  @Override
142  public void close() {
143    // Adjust the quota consumed for the specified operation
144    writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
145
146    long resultSize =
147      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()];
148    if (useResultSizeBytes) {
149      readDiff = resultSize - readConsumed;
150    } else {
151      long blockBytesScanned =
152        RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
153      readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
154    }
155
156    writeCapacityUnitDiff =
157      calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
158    readCapacityUnitDiff = calculateReadCapacityUnitDiff(
159      operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
160      readConsumed);
161
162    for (final QuotaLimiter limiter : limiters) {
163      if (writeDiff != 0) {
164        limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic);
165      }
166      if (readDiff != 0) {
167        limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic);
168      }
169    }
170  }
171
172  @Override
173  public long getReadAvailable() {
174    return readAvailable;
175  }
176
177  @Override
178  public long getReadConsumed() {
179    return readConsumed;
180  }
181
182  @Override
183  public void addGetResult(final Result result) {
184    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
185  }
186
187  @Override
188  public void addScanResult(final List<Result> results) {
189    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
190  }
191
192  @Override
193  public void addScanResultCells(final List<Cell> cells) {
194    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
195  }
196
197  @Override
198  public void addMutation(final Mutation mutation) {
199    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
200  }
201
202  /**
203   * Update estimate quota(read/write size/capacityUnits) which will be consumed
204   * @param numWrites the number of write requests
205   * @param numReads  the number of read requests
206   */
207  protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
208    writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
209
210    if (useResultSizeBytes) {
211      readConsumed = estimateConsume(OperationType.GET, numReads, 100);
212    } else {
213      // assume 1 block required for reads. this is probably a low estimate, which is okay
214      readConsumed = numReads > 0 ? blockSizeBytes : 0;
215    }
216
217    writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
218    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
219  }
220
221  /**
222   * Update estimate quota(read/write size/capacityUnits) which will be consumed
223   * @param scanRequest                     the scan to be executed
224   * @param maxScannerResultSize            the maximum bytes to be returned by the scanner
225   * @param maxBlockBytesScanned            the maximum bytes scanned in a single RPC call by the
226   *                                        scanner
227   * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
228   *                                        calls
229   */
230  protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest,
231    long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
232    if (useResultSizeBytes) {
233      readConsumed = estimateConsume(OperationType.SCAN, 1, 1000);
234    } else {
235      long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(),
236        maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference);
237      readConsumed = Math.min(maxScanEstimate, estimate);
238    }
239
240    readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
241  }
242
243  protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq,
244    long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
245    /*
246     * Estimating scan workload is more complicated, and if we severely underestimate workloads then
247     * throttled clients will exhaust retries too quickly, and could saturate the RPC layer
248     */
249    if (nextCallSeq == 0) {
250      // start scanners with an optimistic 1 block IO estimate
251      // it is better to underestimate a large scan in the beginning
252      // than to overestimate, and block, a small scan
253      return blockSizeBytes;
254    }
255
256    boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes;
257    if (isWorkloadGrowing) {
258      // if nextCallSeq > 0 and the workload is growing then our estimate
259      // should consider that the workload may continue to increase
260      return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned);
261    } else {
262      // if nextCallSeq > 0 and the workload is shrinking or flat
263      // then our workload has likely plateaued. We can just rely on the existing
264      // maxBlockBytesScanned as our estimate in this case.
265      return maxBlockBytesScanned;
266    }
267  }
268
269  private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
270    if (numReqs > 0) {
271      return avgSize * numReqs;
272    }
273    return 0;
274  }
275
276  private long calculateWriteCapacityUnit(final long size) {
277    return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
278  }
279
280  private long calculateReadCapacityUnit(final long size) {
281    return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
282  }
283
284  private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
285    return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
286  }
287
288  private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
289    return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
290  }
291}