001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.Arrays; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.Cell; 024import org.apache.hadoop.hbase.client.Mutation; 025import org.apache.hadoop.hbase.client.Result; 026import org.apache.hadoop.hbase.ipc.RpcCall; 027import org.apache.hadoop.hbase.ipc.RpcServer; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030 031import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 032 033@InterfaceAudience.Private 034@InterfaceStability.Evolving 035public class DefaultOperationQuota implements OperationQuota { 036 037 // a single scan estimate can consume no more than this proportion of the limiter's limit 038 // this prevents a long-running scan from being estimated at, say, 100MB of IO against 039 // a <100MB/IO throttle (because this would never succeed) 040 private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9; 041 042 protected final List<QuotaLimiter> limiters; 043 private final long writeCapacityUnit; 044 private final long readCapacityUnit; 045 046 // the available read/write quota size in bytes 047 protected long readAvailable = 0; 048 // estimated quota 049 protected long writeConsumed = 0; 050 protected long readConsumed = 0; 051 protected long writeCapacityUnitConsumed = 0; 052 protected long readCapacityUnitConsumed = 0; 053 // real consumed quota 054 private final long[] operationSize; 055 // difference between estimated quota and real consumed quota used in close method 056 // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass 057 // of DefaultOperationQuota 058 protected long writeDiff = 0; 059 protected long readDiff = 0; 060 protected long writeCapacityUnitDiff = 0; 061 protected long readCapacityUnitDiff = 0; 062 private boolean useResultSizeBytes; 063 private long blockSizeBytes; 064 private long maxScanEstimate; 065 private boolean isAtomic = false; 066 067 public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, 068 final QuotaLimiter... limiters) { 069 this(conf, Arrays.asList(limiters)); 070 this.useResultSizeBytes = 071 conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); 072 this.blockSizeBytes = blockSizeBytes; 073 long readSizeLimit = 074 Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE); 075 maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); 076 } 077 078 /** 079 * NOTE: The order matters. It should be something like [user, table, namespace, global] 080 */ 081 public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) { 082 this.writeCapacityUnit = 083 conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); 084 this.readCapacityUnit = 085 conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); 086 this.limiters = limiters; 087 int size = OperationType.values().length; 088 operationSize = new long[size]; 089 090 for (int i = 0; i < size; ++i) { 091 operationSize[i] = 0; 092 } 093 } 094 095 @Override 096 public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) 097 throws RpcThrottlingException { 098 updateEstimateConsumeBatchQuota(numWrites, numReads); 099 checkQuota(numWrites, numReads, isAtomic); 100 } 101 102 @Override 103 public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 104 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { 105 updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 106 prevBlockBytesScannedDifference); 107 checkQuota(0, 1, false); 108 } 109 110 private void checkQuota(long numWrites, long numReads, boolean isAtomic) 111 throws RpcThrottlingException { 112 if (isAtomic) { 113 // Remember this flag for later use in close() 114 this.isAtomic = true; 115 } 116 readAvailable = Long.MAX_VALUE; 117 for (final QuotaLimiter limiter : limiters) { 118 if (limiter.isBypass()) { 119 continue; 120 } 121 122 long maxRequestsToEstimate = limiter.getRequestNumLimit(); 123 long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); 124 long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); 125 long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit()); 126 long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit()); 127 128 limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), 129 Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), 130 Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, 131 readCapacityUnitConsumed, isAtomic); 132 readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); 133 } 134 135 for (final QuotaLimiter limiter : limiters) { 136 limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, 137 readCapacityUnitConsumed, isAtomic); 138 } 139 } 140 141 @Override 142 public void close() { 143 // Adjust the quota consumed for the specified operation 144 writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; 145 146 long resultSize = 147 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; 148 if (useResultSizeBytes) { 149 readDiff = resultSize - readConsumed; 150 } else { 151 long blockBytesScanned = 152 RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); 153 readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; 154 } 155 156 writeCapacityUnitDiff = 157 calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); 158 readCapacityUnitDiff = calculateReadCapacityUnitDiff( 159 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], 160 readConsumed); 161 162 for (final QuotaLimiter limiter : limiters) { 163 if (writeDiff != 0) { 164 limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic); 165 } 166 if (readDiff != 0) { 167 limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic); 168 } 169 } 170 } 171 172 @Override 173 public long getReadAvailable() { 174 return readAvailable; 175 } 176 177 @Override 178 public long getReadConsumed() { 179 return readConsumed; 180 } 181 182 @Override 183 public void addGetResult(final Result result) { 184 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); 185 } 186 187 @Override 188 public void addScanResult(final List<Result> results) { 189 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); 190 } 191 192 @Override 193 public void addScanResultCells(final List<Cell> cells) { 194 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells); 195 } 196 197 @Override 198 public void addMutation(final Mutation mutation) { 199 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); 200 } 201 202 /** 203 * Update estimate quota(read/write size/capacityUnits) which will be consumed 204 * @param numWrites the number of write requests 205 * @param numReads the number of read requests 206 */ 207 protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { 208 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); 209 210 if (useResultSizeBytes) { 211 readConsumed = estimateConsume(OperationType.GET, numReads, 100); 212 } else { 213 // assume 1 block required for reads. this is probably a low estimate, which is okay 214 readConsumed = numReads > 0 ? blockSizeBytes : 0; 215 } 216 217 writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); 218 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 219 } 220 221 /** 222 * Update estimate quota(read/write size/capacityUnits) which will be consumed 223 * @param scanRequest the scan to be executed 224 * @param maxScannerResultSize the maximum bytes to be returned by the scanner 225 * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the 226 * scanner 227 * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next 228 * calls 229 */ 230 protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, 231 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 232 if (useResultSizeBytes) { 233 readConsumed = estimateConsume(OperationType.SCAN, 1, 1000); 234 } else { 235 long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(), 236 maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); 237 readConsumed = Math.min(maxScanEstimate, estimate); 238 } 239 240 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 241 } 242 243 protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, 244 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 245 /* 246 * Estimating scan workload is more complicated, and if we severely underestimate workloads then 247 * throttled clients will exhaust retries too quickly, and could saturate the RPC layer 248 */ 249 if (nextCallSeq == 0) { 250 // start scanners with an optimistic 1 block IO estimate 251 // it is better to underestimate a large scan in the beginning 252 // than to overestimate, and block, a small scan 253 return blockSizeBytes; 254 } 255 256 boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes; 257 if (isWorkloadGrowing) { 258 // if nextCallSeq > 0 and the workload is growing then our estimate 259 // should consider that the workload may continue to increase 260 return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned); 261 } else { 262 // if nextCallSeq > 0 and the workload is shrinking or flat 263 // then our workload has likely plateaued. We can just rely on the existing 264 // maxBlockBytesScanned as our estimate in this case. 265 return maxBlockBytesScanned; 266 } 267 } 268 269 private long estimateConsume(final OperationType type, int numReqs, long avgSize) { 270 if (numReqs > 0) { 271 return avgSize * numReqs; 272 } 273 return 0; 274 } 275 276 private long calculateWriteCapacityUnit(final long size) { 277 return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); 278 } 279 280 private long calculateReadCapacityUnit(final long size) { 281 return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); 282 } 283 284 private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { 285 return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); 286 } 287 288 private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { 289 return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); 290 } 291}