001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.io.IOException; 023import java.time.Duration; 024import java.util.ArrayList; 025import java.util.EnumSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Optional; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.concurrent.TimeUnit; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.ClusterMetrics; 035import org.apache.hadoop.hbase.ClusterMetrics.Option; 036import org.apache.hadoop.hbase.ScheduledChore; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.RegionStatesCount; 041import org.apache.hadoop.hbase.ipc.RpcCall; 042import org.apache.hadoop.hbase.ipc.RpcServer; 043import org.apache.hadoop.hbase.regionserver.HRegionServer; 044import org.apache.hadoop.hbase.regionserver.RegionServerServices; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.security.UserGroupInformation; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; 054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; 055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; 056 057/** 058 * Cache that keeps track of the quota settings for the users and tables that are interacting with 059 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will 060 * be returned and the request to fetch the quota information will be enqueued for the next refresh. 061 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss 062 * events. Later the Quotas will be pushed using the notification system. 063 */ 064@InterfaceAudience.Private 065@InterfaceStability.Evolving 066public class QuotaCache implements Stoppable { 067 private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); 068 069 public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; 070 public static final String TABLE_REGION_STATES_CACHE_TTL_MS = 071 "hbase.quota.cache.ttl.region.states.ms"; 072 public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS = 073 "hbase.quota.cache.ttl.servers.size.ms"; 074 075 // defines the request attribute key which, when provided, will override the request's username 076 // from the perspective of user quotas 077 public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = 078 "hbase.quota.user.override.key"; 079 private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min 080 private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD 081 082 // for testing purpose only, enforce the cache to be always refreshed 083 static boolean TEST_FORCE_REFRESH = false; 084 // for testing purpose only, block cache refreshes to reliably verify state 085 static boolean TEST_BLOCK_REFRESH = false; 086 087 private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>(); 088 private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>(); 089 private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>(); 090 private final ConcurrentMap<String, QuotaState> regionServerQuotaCache = 091 new ConcurrentHashMap<>(); 092 private volatile boolean exceedThrottleQuotaEnabled = false; 093 // factors used to divide cluster scope quota into machine scope quota 094 private volatile double machineQuotaFactor = 1; 095 private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors = 096 new ConcurrentHashMap<>(); 097 private final RegionServerServices rsServices; 098 private final String userOverrideRequestAttributeKey; 099 100 private QuotaRefresherChore refreshChore; 101 private boolean stopped = true; 102 103 public QuotaCache(final RegionServerServices rsServices) { 104 this.rsServices = rsServices; 105 this.userOverrideRequestAttributeKey = 106 rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); 107 } 108 109 public void start() throws IOException { 110 stopped = false; 111 112 // TODO: This will be replaced once we have the notification bus ready. 113 Configuration conf = rsServices.getConfiguration(); 114 int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD); 115 refreshChore = new QuotaRefresherChore(conf, period, this); 116 rsServices.getChoreService().scheduleChore(refreshChore); 117 } 118 119 @Override 120 public void stop(final String why) { 121 if (refreshChore != null) { 122 LOG.debug("Stopping QuotaRefresherChore chore."); 123 refreshChore.shutdown(true); 124 } 125 stopped = true; 126 } 127 128 @Override 129 public boolean isStopped() { 130 return stopped; 131 } 132 133 /** 134 * Returns the limiter associated to the specified user/table. 135 * @param ugi the user to limit 136 * @param table the table to limit 137 * @return the limiter associated to the specified user/table 138 */ 139 public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) { 140 if (table.isSystemTable()) { 141 return NoopQuotaLimiter.get(); 142 } 143 return getUserQuotaState(ugi).getTableLimiter(table); 144 } 145 146 /** 147 * Returns the QuotaState associated to the specified user. 148 * @param ugi the user 149 * @return the quota info associated to specified user 150 */ 151 public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { 152 return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), 153 () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L)); 154 } 155 156 /** 157 * Returns the limiter associated to the specified table. 158 * @param table the table to limit 159 * @return the limiter associated to the specified table 160 */ 161 public QuotaLimiter getTableLimiter(final TableName table) { 162 return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter(); 163 } 164 165 /** 166 * Returns the limiter associated to the specified namespace. 167 * @param namespace the namespace to limit 168 * @return the limiter associated to the specified namespace 169 */ 170 public QuotaLimiter getNamespaceLimiter(final String namespace) { 171 return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter(); 172 } 173 174 /** 175 * Returns the limiter associated to the specified region server. 176 * @param regionServer the region server to limit 177 * @return the limiter associated to the specified region server 178 */ 179 public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) { 180 return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter(); 181 } 182 183 protected boolean isExceedThrottleQuotaEnabled() { 184 return exceedThrottleQuotaEnabled; 185 } 186 187 /** 188 * Applies a request attribute user override if available, otherwise returns the UGI's short 189 * username 190 * @param ugi The request's UserGroupInformation 191 */ 192 private String getQuotaUserName(final UserGroupInformation ugi) { 193 if (userOverrideRequestAttributeKey == null) { 194 return ugi.getShortUserName(); 195 } 196 197 Optional<RpcCall> rpcCall = RpcServer.getCurrentCall(); 198 if (!rpcCall.isPresent()) { 199 return ugi.getShortUserName(); 200 } 201 202 byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); 203 if (override == null) { 204 return ugi.getShortUserName(); 205 } 206 return Bytes.toString(override); 207 } 208 209 /** 210 * Returns the QuotaState requested. If the quota info is not in cache an empty one will be 211 * returned and the quota request will be enqueued for the next cache refresh. 212 */ 213 private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) { 214 return computeIfAbsent(quotasMap, key, QuotaState::new); 215 } 216 217 void triggerCacheRefresh() { 218 refreshChore.triggerNow(); 219 } 220 221 void forceSynchronousCacheRefresh() { 222 refreshChore.chore(); 223 } 224 225 long getLastUpdate() { 226 return refreshChore.lastUpdate; 227 } 228 229 Map<String, QuotaState> getNamespaceQuotaCache() { 230 return namespaceQuotaCache; 231 } 232 233 Map<String, QuotaState> getRegionServerQuotaCache() { 234 return regionServerQuotaCache; 235 } 236 237 Map<TableName, QuotaState> getTableQuotaCache() { 238 return tableQuotaCache; 239 } 240 241 Map<String, UserQuotaState> getUserQuotaCache() { 242 return userQuotaCache; 243 } 244 245 // TODO: Remove this once we have the notification bus 246 private class QuotaRefresherChore extends ScheduledChore { 247 private long lastUpdate = 0; 248 249 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability. 250 // So we cache the results to reduce that load. 251 private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics; 252 private final RefreshableExpiringValueCache<Integer> regionServersSize; 253 254 public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) { 255 super("QuotaRefresherChore", stoppable, period); 256 257 Duration tableRegionStatesCacheTtl = 258 Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period)); 259 this.tableRegionStatesClusterMetrics = 260 new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics", 261 tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin() 262 .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT))); 263 264 Duration regionServersSizeCacheTtl = 265 Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period)); 266 regionServersSize = 267 new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl, 268 () -> rsServices.getConnection().getAdmin().getRegionServers().size()); 269 } 270 271 @Override 272 public synchronized boolean triggerNow() { 273 tableRegionStatesClusterMetrics.invalidate(); 274 regionServersSize.invalidate(); 275 return super.triggerNow(); 276 } 277 278 @Override 279 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES", 280 justification = "I do not understand why the complaints, it looks good to me -- FIX") 281 protected void chore() { 282 while (TEST_BLOCK_REFRESH) { 283 LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false"); 284 try { 285 Thread.sleep(10); 286 } catch (InterruptedException e) { 287 throw new RuntimeException(e); 288 } 289 } 290 // Prefetch online tables/namespaces 291 for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) { 292 if (table.isSystemTable()) { 293 continue; 294 } 295 QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState()); 296 297 final String ns = table.getNamespaceAsString(); 298 299 QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState()); 300 } 301 302 QuotaCache.this.regionServerQuotaCache 303 .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState()); 304 305 updateQuotaFactors(); 306 fetchNamespaceQuotaState(); 307 fetchTableQuotaState(); 308 fetchUserQuotaState(); 309 fetchRegionServerQuotaState(); 310 fetchExceedThrottleQuota(); 311 lastUpdate = EnvironmentEdgeManager.currentTime(); 312 } 313 314 private void fetchNamespaceQuotaState() { 315 fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() { 316 @Override 317 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 318 return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey()); 319 } 320 321 @Override 322 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 323 return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets, 324 machineQuotaFactor); 325 } 326 }); 327 } 328 329 private void fetchTableQuotaState() { 330 fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() { 331 @Override 332 public Get makeGet(final Map.Entry<TableName, QuotaState> entry) { 333 return QuotaUtil.makeGetForTableQuotas(entry.getKey()); 334 } 335 336 @Override 337 public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 338 return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets, 339 tableMachineQuotaFactors); 340 } 341 }); 342 } 343 344 private void fetchUserQuotaState() { 345 final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet(); 346 final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet(); 347 fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() { 348 @Override 349 public Get makeGet(final Map.Entry<String, UserQuotaState> entry) { 350 return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces); 351 } 352 353 @Override 354 public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException { 355 return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets, 356 tableMachineQuotaFactors, machineQuotaFactor); 357 } 358 }); 359 } 360 361 private void fetchRegionServerQuotaState() { 362 fetch("regionServer", QuotaCache.this.regionServerQuotaCache, 363 new Fetcher<String, QuotaState>() { 364 @Override 365 public Get makeGet(final Map.Entry<String, QuotaState> entry) { 366 return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey()); 367 } 368 369 @Override 370 public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException { 371 return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets); 372 } 373 }); 374 } 375 376 private void fetchExceedThrottleQuota() { 377 try { 378 QuotaCache.this.exceedThrottleQuotaEnabled = 379 QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection()); 380 } catch (IOException e) { 381 LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e); 382 } 383 } 384 385 private <K, V extends QuotaState> void fetch(final String type, 386 final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) { 387 long now = EnvironmentEdgeManager.currentTime(); 388 long refreshPeriod = getPeriod(); 389 long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR; 390 391 // Find the quota entries to update 392 List<Get> gets = new ArrayList<>(); 393 List<K> toRemove = new ArrayList<>(); 394 for (Map.Entry<K, V> entry : quotasMap.entrySet()) { 395 long lastUpdate = entry.getValue().getLastUpdate(); 396 long lastQuery = entry.getValue().getLastQuery(); 397 if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) { 398 toRemove.add(entry.getKey()); 399 } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) { 400 gets.add(fetcher.makeGet(entry)); 401 } 402 } 403 404 for (final K key : toRemove) { 405 if (LOG.isTraceEnabled()) { 406 LOG.trace("evict " + type + " key=" + key); 407 } 408 quotasMap.remove(key); 409 } 410 411 // fetch and update the quota entries 412 if (!gets.isEmpty()) { 413 try { 414 for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) { 415 V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue()); 416 if (quotaInfo != null) { 417 quotaInfo.update(entry.getValue()); 418 } 419 420 if (LOG.isTraceEnabled()) { 421 LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo); 422 } 423 } 424 } catch (IOException e) { 425 LOG.warn("Unable to read " + type + " from quota table", e); 426 } 427 } 428 } 429 430 /** 431 * Update quota factors which is used to divide cluster scope quota into machine scope quota For 432 * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user 433 * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor. 434 */ 435 private void updateQuotaFactors() { 436 boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty() 437 || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters); 438 if (hasTableQuotas) { 439 updateTableMachineQuotaFactors(); 440 } else { 441 updateOnlyMachineQuotaFactors(); 442 } 443 } 444 445 /** 446 * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if 447 * we don't have any table quotas in the cache. 448 */ 449 private void updateOnlyMachineQuotaFactors() { 450 Optional<Integer> rsSize = regionServersSize.get(); 451 if (rsSize.isPresent()) { 452 updateMachineQuotaFactors(rsSize.get()); 453 } else { 454 regionServersSize.refresh(); 455 } 456 } 457 458 /** 459 * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine 460 * factors as well. This relies on a more expensive query for ClusterMetrics. 461 */ 462 private void updateTableMachineQuotaFactors() { 463 Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get(); 464 if (!clusterMetricsMaybe.isPresent()) { 465 tableRegionStatesClusterMetrics.refresh(); 466 return; 467 } 468 ClusterMetrics clusterMetrics = clusterMetricsMaybe.get(); 469 updateMachineQuotaFactors(clusterMetrics.getServersName().size()); 470 471 Map<TableName, RegionStatesCount> tableRegionStatesCount = 472 clusterMetrics.getTableRegionStatesCount(); 473 474 // Update table machine quota factors 475 for (TableName tableName : tableQuotaCache.keySet()) { 476 if (tableRegionStatesCount.containsKey(tableName)) { 477 double factor = 1; 478 try { 479 long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions(); 480 if (regionSize == 0) { 481 factor = 0; 482 } else { 483 int localRegionSize = rsServices.getRegions(tableName).size(); 484 factor = 1.0 * localRegionSize / regionSize; 485 } 486 } catch (IOException e) { 487 LOG.warn("Get table regions failed: {}", tableName, e); 488 } 489 tableMachineQuotaFactors.put(tableName, factor); 490 } else { 491 // TableName might have already been dropped (outdated) 492 tableMachineQuotaFactors.remove(tableName); 493 } 494 } 495 } 496 497 private void updateMachineQuotaFactors(int rsSize) { 498 if (rsSize != 0) { 499 // TODO if use rs group, the cluster limit should be shared by the rs group 500 machineQuotaFactor = 1.0 / rsSize; 501 } 502 } 503 } 504 505 static class RefreshableExpiringValueCache<T> { 506 private final String name; 507 private final LoadingCache<String, Optional<T>> cache; 508 509 RefreshableExpiringValueCache(String name, Duration refreshPeriod, 510 ThrowingSupplier<T> supplier) { 511 this.name = name; 512 this.cache = 513 CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS) 514 .build(new CacheLoader<>() { 515 @Override 516 public Optional<T> load(String key) { 517 try { 518 return Optional.of(supplier.get()); 519 } catch (Exception e) { 520 LOG.warn("Failed to refresh cache {}", name, e); 521 return Optional.empty(); 522 } 523 } 524 }); 525 } 526 527 Optional<T> get() { 528 return cache.getUnchecked(name); 529 } 530 531 void refresh() { 532 cache.refresh(name); 533 } 534 535 void invalidate() { 536 cache.invalidate(name); 537 } 538 } 539 540 @FunctionalInterface 541 static interface ThrowingSupplier<T> { 542 T get() throws Exception; 543 } 544 545 static interface Fetcher<Key, Value> { 546 Get makeGet(Map.Entry<Key, Value> entry); 547 548 Map<Key, Value> fetchEntries(List<Get> gets) throws IOException; 549 } 550}