001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Optional; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.client.TableDescriptor; 025import org.apache.hadoop.hbase.ipc.RpcScheduler; 026import org.apache.hadoop.hbase.ipc.RpcServer; 027import org.apache.hadoop.hbase.regionserver.Region; 028import org.apache.hadoop.hbase.regionserver.RegionServerServices; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.security.UserGroupInformation; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.yetus.audience.InterfaceStability; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 037 038/** 039 * Region Server Quota Manager. It is responsible to provide access to the quota information of each 040 * user/table. The direct user of this class is the RegionServer that will get and check the 041 * user/table quota for each operation (put, get, scan). For system tables and user/table with a 042 * quota specified, the quota check will be a noop. 043 */ 044@InterfaceAudience.Private 045@InterfaceStability.Evolving 046public class RegionServerRpcQuotaManager { 047 private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); 048 049 private final RegionServerServices rsServices; 050 051 private QuotaCache quotaCache = null; 052 private volatile boolean rpcThrottleEnabled; 053 // Storage for quota rpc throttle 054 private RpcThrottleStorage rpcThrottleStorage; 055 056 public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { 057 this.rsServices = rsServices; 058 rpcThrottleStorage = 059 new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); 060 } 061 062 public void start(final RpcScheduler rpcScheduler) throws IOException { 063 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 064 LOG.info("Quota support disabled"); 065 return; 066 } 067 068 LOG.info("Initializing RPC quota support"); 069 070 // Initialize quota cache 071 quotaCache = new QuotaCache(rsServices); 072 quotaCache.start(); 073 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 074 LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled); 075 } 076 077 public void stop() { 078 if (isQuotaEnabled()) { 079 quotaCache.stop("shutdown"); 080 } 081 } 082 083 protected boolean isRpcThrottleEnabled() { 084 return rpcThrottleEnabled; 085 } 086 087 private boolean isQuotaEnabled() { 088 return quotaCache != null; 089 } 090 091 public void switchRpcThrottle(boolean enable) throws IOException { 092 if (isQuotaEnabled()) { 093 if (rpcThrottleEnabled != enable) { 094 boolean previousEnabled = rpcThrottleEnabled; 095 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 096 LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled); 097 } else { 098 LOG.warn( 099 "Skip switch rpc throttle because previous value {} is the same as current value {}", 100 rpcThrottleEnabled, enable); 101 } 102 } else { 103 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable); 104 } 105 } 106 107 QuotaCache getQuotaCache() { 108 return quotaCache; 109 } 110 111 /** 112 * Returns the quota for an operation. 113 * @param ugi the user that is executing the operation 114 * @param table the table where the operation will be executed 115 * @return the OperationQuota 116 */ 117 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table, 118 final int blockSizeBytes) { 119 if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { 120 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); 121 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); 122 boolean useNoop = userLimiter.isBypass(); 123 if (userQuotaState.hasBypassGlobals()) { 124 if (LOG.isTraceEnabled()) { 125 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); 126 } 127 if (!useNoop) { 128 return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 129 userLimiter); 130 } 131 } else { 132 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); 133 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); 134 QuotaLimiter rsLimiter = 135 quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY); 136 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass(); 137 boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled(); 138 if (LOG.isTraceEnabled()) { 139 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter 140 + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter=" 141 + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled); 142 } 143 if (!useNoop) { 144 if (exceedThrottleQuotaEnabled) { 145 return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 146 rsLimiter, userLimiter, tableLimiter, nsLimiter); 147 } else { 148 return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 149 userLimiter, tableLimiter, nsLimiter, rsLimiter); 150 } 151 } 152 } 153 } 154 return NoopOperationQuota.get(); 155 } 156 157 /** 158 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 159 * available quota and to report the data/usage of the operation. This method is specific to scans 160 * because estimating a scan's workload is more complicated than estimating the workload of a 161 * get/put. 162 * @param region the region where the operation will be performed 163 * @param scanRequest the scan to be estimated against the quota 164 * @param maxScannerResultSize the maximum bytes to be returned by the scanner 165 * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the 166 * scanner 167 * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next 168 * calls 169 * @return the OperationQuota 170 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 171 */ 172 public OperationQuota checkScanQuota(final Region region, 173 final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 174 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) 175 throws IOException, RpcThrottlingException { 176 Optional<User> user = RpcServer.getRequestUser(); 177 UserGroupInformation ugi; 178 if (user.isPresent()) { 179 ugi = user.get().getUGI(); 180 } else { 181 ugi = User.getCurrent().getUGI(); 182 } 183 TableDescriptor tableDescriptor = region.getTableDescriptor(); 184 TableName table = tableDescriptor.getTableName(); 185 186 OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); 187 try { 188 quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 189 prevBlockBytesScannedDifference); 190 } catch (RpcThrottlingException e) { 191 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" 192 + scanRequest.getScannerId() + ": " + e.getMessage()); 193 throw e; 194 } 195 return quota; 196 } 197 198 /** 199 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 200 * available quota and to report the data/usage of the operation. This method does not support 201 * scans because estimating a scan's workload is more complicated than estimating the workload of 202 * a get/put. 203 * @param region the region where the operation will be performed 204 * @param type the operation type 205 * @return the OperationQuota 206 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 207 */ 208 public OperationQuota checkBatchQuota(final Region region, 209 final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { 210 switch (type) { 211 case GET: 212 return this.checkBatchQuota(region, 0, 1); 213 case MUTATE: 214 return this.checkBatchQuota(region, 1, 0); 215 case CHECK_AND_MUTATE: 216 return this.checkBatchQuota(region, 1, 1); 217 } 218 throw new RuntimeException("Invalid operation type: " + type); 219 } 220 221 /** 222 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 223 * available quota and to report the data/usage of the operation. This method does not support 224 * scans because estimating a scan's workload is more complicated than estimating the workload of 225 * a get/put. 226 * @param region the region where the operation will be performed 227 * @param actions the "multi" actions to perform 228 * @param hasCondition whether the RegionAction has a condition 229 * @return the OperationQuota 230 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 231 */ 232 public OperationQuota checkBatchQuota(final Region region, 233 final List<ClientProtos.Action> actions, boolean hasCondition) 234 throws IOException, RpcThrottlingException { 235 int numWrites = 0; 236 int numReads = 0; 237 for (final ClientProtos.Action action : actions) { 238 if (action.hasMutation()) { 239 numWrites++; 240 OperationQuota.OperationType operationType = 241 QuotaUtil.getQuotaOperationType(action, hasCondition); 242 if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) { 243 numReads++; 244 } 245 } else if (action.hasGet()) { 246 numReads++; 247 } 248 } 249 return checkBatchQuota(region, numWrites, numReads); 250 } 251 252 /** 253 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 254 * available quota and to report the data/usage of the operation. 255 * @param region the region where the operation will be performed 256 * @param numWrites number of writes to perform 257 * @param numReads number of short-reads to perform 258 * @return the OperationQuota 259 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 260 */ 261 private OperationQuota checkBatchQuota(final Region region, final int numWrites, 262 final int numReads) throws IOException, RpcThrottlingException { 263 Optional<User> user = RpcServer.getRequestUser(); 264 UserGroupInformation ugi; 265 if (user.isPresent()) { 266 ugi = user.get().getUGI(); 267 } else { 268 ugi = User.getCurrent().getUGI(); 269 } 270 TableDescriptor tableDescriptor = region.getTableDescriptor(); 271 TableName table = tableDescriptor.getTableName(); 272 273 OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); 274 try { 275 quota.checkBatchQuota(numWrites, numReads); 276 } catch (RpcThrottlingException e) { 277 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table 278 + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); 279 throw e; 280 } 281 return quota; 282 } 283}