001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.util.Arrays; 021import java.util.List; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.client.Mutation; 024import org.apache.hadoop.hbase.client.Result; 025import org.apache.hadoop.hbase.ipc.RpcCall; 026import org.apache.hadoop.hbase.ipc.RpcServer; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.yetus.audience.InterfaceStability; 029 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 031 032@InterfaceAudience.Private 033@InterfaceStability.Evolving 034public class DefaultOperationQuota implements OperationQuota { 035 036 // a single scan estimate can consume no more than this proportion of the limiter's limit 037 // this prevents a long-running scan from being estimated at, say, 100MB of IO against 038 // a <100MB/IO throttle (because this would never succeed) 039 private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9; 040 041 protected final List<QuotaLimiter> limiters; 042 private final long writeCapacityUnit; 043 private final long readCapacityUnit; 044 045 // the available read/write quota size in bytes 046 protected long readAvailable = 0; 047 // estimated quota 048 protected long writeConsumed = 0; 049 protected long readConsumed = 0; 050 protected long writeCapacityUnitConsumed = 0; 051 protected long readCapacityUnitConsumed = 0; 052 // real consumed quota 053 private final long[] operationSize; 054 // difference between estimated quota and real consumed quota used in close method 055 // to adjust quota amount. Also used by ExceedOperationQuota which is a subclass 056 // of DefaultOperationQuota 057 protected long writeDiff = 0; 058 protected long readDiff = 0; 059 protected long writeCapacityUnitDiff = 0; 060 protected long readCapacityUnitDiff = 0; 061 private boolean useResultSizeBytes; 062 private long blockSizeBytes; 063 private long maxScanEstimate; 064 065 public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, 066 final QuotaLimiter... limiters) { 067 this(conf, Arrays.asList(limiters)); 068 this.useResultSizeBytes = 069 conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); 070 this.blockSizeBytes = blockSizeBytes; 071 long readSizeLimit = 072 Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE); 073 maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); 074 } 075 076 /** 077 * NOTE: The order matters. It should be something like [user, table, namespace, global] 078 */ 079 public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) { 080 this.writeCapacityUnit = 081 conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); 082 this.readCapacityUnit = 083 conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); 084 this.limiters = limiters; 085 int size = OperationType.values().length; 086 operationSize = new long[size]; 087 088 for (int i = 0; i < size; ++i) { 089 operationSize[i] = 0; 090 } 091 } 092 093 @Override 094 public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { 095 updateEstimateConsumeBatchQuota(numWrites, numReads); 096 checkQuota(numWrites, numReads); 097 } 098 099 @Override 100 public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 101 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { 102 updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 103 prevBlockBytesScannedDifference); 104 checkQuota(0, 1); 105 } 106 107 private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException { 108 readAvailable = Long.MAX_VALUE; 109 for (final QuotaLimiter limiter : limiters) { 110 if (limiter.isBypass()) { 111 continue; 112 } 113 114 limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, 115 writeCapacityUnitConsumed, readCapacityUnitConsumed); 116 readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); 117 } 118 119 for (final QuotaLimiter limiter : limiters) { 120 limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, 121 readCapacityUnitConsumed); 122 } 123 } 124 125 @Override 126 public void close() { 127 // Adjust the quota consumed for the specified operation 128 writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; 129 130 long resultSize = 131 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()]; 132 if (useResultSizeBytes) { 133 readDiff = resultSize - readConsumed; 134 } else { 135 long blockBytesScanned = 136 RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); 137 readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; 138 } 139 140 writeCapacityUnitDiff = 141 calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); 142 readCapacityUnitDiff = calculateReadCapacityUnitDiff( 143 operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], 144 readConsumed); 145 146 for (final QuotaLimiter limiter : limiters) { 147 if (writeDiff != 0) { 148 limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); 149 } 150 if (readDiff != 0) { 151 limiter.consumeRead(readDiff, readCapacityUnitDiff); 152 } 153 } 154 } 155 156 @Override 157 public long getReadAvailable() { 158 return readAvailable; 159 } 160 161 @Override 162 public long getReadConsumed() { 163 return readConsumed; 164 } 165 166 @Override 167 public void addGetResult(final Result result) { 168 operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); 169 } 170 171 @Override 172 public void addScanResult(final List<Result> results) { 173 operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); 174 } 175 176 @Override 177 public void addMutation(final Mutation mutation) { 178 operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); 179 } 180 181 /** 182 * Update estimate quota(read/write size/capacityUnits) which will be consumed 183 * @param numWrites the number of write requests 184 * @param numReads the number of read requests 185 */ 186 protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { 187 writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); 188 189 if (useResultSizeBytes) { 190 readConsumed = estimateConsume(OperationType.GET, numReads, 100); 191 } else { 192 // assume 1 block required for reads. this is probably a low estimate, which is okay 193 readConsumed = numReads > 0 ? blockSizeBytes : 0; 194 } 195 196 writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); 197 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 198 } 199 200 /** 201 * Update estimate quota(read/write size/capacityUnits) which will be consumed 202 * @param scanRequest the scan to be executed 203 * @param maxScannerResultSize the maximum bytes to be returned by the scanner 204 * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the 205 * scanner 206 * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next 207 * calls 208 */ 209 protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, 210 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 211 if (useResultSizeBytes) { 212 readConsumed = estimateConsume(OperationType.SCAN, 1, 1000); 213 } else { 214 long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(), 215 maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); 216 readConsumed = Math.min(maxScanEstimate, estimate); 217 } 218 219 readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); 220 } 221 222 protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, 223 long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { 224 /* 225 * Estimating scan workload is more complicated, and if we severely underestimate workloads then 226 * throttled clients will exhaust retries too quickly, and could saturate the RPC layer 227 */ 228 if (nextCallSeq == 0) { 229 // start scanners with an optimistic 1 block IO estimate 230 // it is better to underestimate a large scan in the beginning 231 // than to overestimate, and block, a small scan 232 return blockSizeBytes; 233 } 234 235 boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes; 236 if (isWorkloadGrowing) { 237 // if nextCallSeq > 0 and the workload is growing then our estimate 238 // should consider that the workload may continue to increase 239 return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned); 240 } else { 241 // if nextCallSeq > 0 and the workload is shrinking or flat 242 // then our workload has likely plateaued. We can just rely on the existing 243 // maxBlockBytesScanned as our estimate in this case. 244 return maxBlockBytesScanned; 245 } 246 } 247 248 private long estimateConsume(final OperationType type, int numReqs, long avgSize) { 249 if (numReqs > 0) { 250 return avgSize * numReqs; 251 } 252 return 0; 253 } 254 255 private long calculateWriteCapacityUnit(final long size) { 256 return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); 257 } 258 259 private long calculateReadCapacityUnit(final long size) { 260 return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); 261 } 262 263 private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { 264 return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); 265 } 266 267 private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { 268 return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); 269 } 270}