001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
021
022import java.io.IOException;
023import java.time.Duration;
024import java.util.ArrayList;
025import java.util.EnumSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Optional;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.ConcurrentMap;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.ClusterMetrics;
035import org.apache.hadoop.hbase.ClusterMetrics.Option;
036import org.apache.hadoop.hbase.ScheduledChore;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Get;
040import org.apache.hadoop.hbase.client.RegionStatesCount;
041import org.apache.hadoop.hbase.ipc.RpcCall;
042import org.apache.hadoop.hbase.ipc.RpcServer;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.regionserver.RegionServerServices;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.security.UserGroupInformation;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
056
057/**
058 * Cache that keeps track of the quota settings for the users and tables that are interacting with
059 * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
060 * be returned and the request to fetch the quota information will be enqueued for the next refresh.
061 * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
062 * events. Later the Quotas will be pushed using the notification system.
063 */
064@InterfaceAudience.Private
065@InterfaceStability.Evolving
066public class QuotaCache implements Stoppable {
067  private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);
068
069  public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";
070  public static final String TABLE_REGION_STATES_CACHE_TTL_MS =
071    "hbase.quota.cache.ttl.region.states.ms";
072  public static final String REGION_SERVERS_SIZE_CACHE_TTL_MS =
073    "hbase.quota.cache.ttl.servers.size.ms";
074
075  // defines the request attribute key which, when provided, will override the request's username
076  // from the perspective of user quotas
077  public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
078    "hbase.quota.user.override.key";
079  private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
080  private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD
081
082  // for testing purpose only, enforce the cache to be always refreshed
083  static boolean TEST_FORCE_REFRESH = false;
084  // for testing purpose only, block cache refreshes to reliably verify state
085  static boolean TEST_BLOCK_REFRESH = false;
086
087  private final ConcurrentMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
088  private final ConcurrentMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
089  private final ConcurrentMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
090  private final ConcurrentMap<String, QuotaState> regionServerQuotaCache =
091    new ConcurrentHashMap<>();
092  private volatile boolean exceedThrottleQuotaEnabled = false;
093  // factors used to divide cluster scope quota into machine scope quota
094  private volatile double machineQuotaFactor = 1;
095  private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
096    new ConcurrentHashMap<>();
097  private final RegionServerServices rsServices;
098  private final String userOverrideRequestAttributeKey;
099
100  private QuotaRefresherChore refreshChore;
101  private boolean stopped = true;
102
103  public QuotaCache(final RegionServerServices rsServices) {
104    this.rsServices = rsServices;
105    this.userOverrideRequestAttributeKey =
106      rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
107  }
108
109  public void start() throws IOException {
110    stopped = false;
111
112    // TODO: This will be replaced once we have the notification bus ready.
113    Configuration conf = rsServices.getConfiguration();
114    int period = conf.getInt(REFRESH_CONF_KEY, REFRESH_DEFAULT_PERIOD);
115    refreshChore = new QuotaRefresherChore(conf, period, this);
116    rsServices.getChoreService().scheduleChore(refreshChore);
117  }
118
119  @Override
120  public void stop(final String why) {
121    if (refreshChore != null) {
122      LOG.debug("Stopping QuotaRefresherChore chore.");
123      refreshChore.shutdown(true);
124    }
125    stopped = true;
126  }
127
128  @Override
129  public boolean isStopped() {
130    return stopped;
131  }
132
133  /**
134   * Returns the limiter associated to the specified user/table.
135   * @param ugi   the user to limit
136   * @param table the table to limit
137   * @return the limiter associated to the specified user/table
138   */
139  public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableName table) {
140    if (table.isSystemTable()) {
141      return NoopQuotaLimiter.get();
142    }
143    return getUserQuotaState(ugi).getTableLimiter(table);
144  }
145
146  /**
147   * Returns the QuotaState associated to the specified user.
148   * @param ugi the user
149   * @return the quota info associated to specified user
150   */
151  public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
152    return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
153      () -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L));
154  }
155
156  /**
157   * Returns the limiter associated to the specified table.
158   * @param table the table to limit
159   * @return the limiter associated to the specified table
160   */
161  public QuotaLimiter getTableLimiter(final TableName table) {
162    return getQuotaState(this.tableQuotaCache, table).getGlobalLimiter();
163  }
164
165  /**
166   * Returns the limiter associated to the specified namespace.
167   * @param namespace the namespace to limit
168   * @return the limiter associated to the specified namespace
169   */
170  public QuotaLimiter getNamespaceLimiter(final String namespace) {
171    return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
172  }
173
174  /**
175   * Returns the limiter associated to the specified region server.
176   * @param regionServer the region server to limit
177   * @return the limiter associated to the specified region server
178   */
179  public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
180    return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
181  }
182
183  protected boolean isExceedThrottleQuotaEnabled() {
184    return exceedThrottleQuotaEnabled;
185  }
186
187  /**
188   * Applies a request attribute user override if available, otherwise returns the UGI's short
189   * username
190   * @param ugi The request's UserGroupInformation
191   */
192  private String getQuotaUserName(final UserGroupInformation ugi) {
193    if (userOverrideRequestAttributeKey == null) {
194      return ugi.getShortUserName();
195    }
196
197    Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
198    if (!rpcCall.isPresent()) {
199      return ugi.getShortUserName();
200    }
201
202    byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
203    if (override == null) {
204      return ugi.getShortUserName();
205    }
206    return Bytes.toString(override);
207  }
208
209  /**
210   * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
211   * returned and the quota request will be enqueued for the next cache refresh.
212   */
213  private <K> QuotaState getQuotaState(final ConcurrentMap<K, QuotaState> quotasMap, final K key) {
214    return computeIfAbsent(quotasMap, key, QuotaState::new);
215  }
216
217  void triggerCacheRefresh() {
218    refreshChore.triggerNow();
219  }
220
221  void forceSynchronousCacheRefresh() {
222    refreshChore.chore();
223  }
224
225  long getLastUpdate() {
226    return refreshChore.lastUpdate;
227  }
228
229  Map<String, QuotaState> getNamespaceQuotaCache() {
230    return namespaceQuotaCache;
231  }
232
233  Map<String, QuotaState> getRegionServerQuotaCache() {
234    return regionServerQuotaCache;
235  }
236
237  Map<TableName, QuotaState> getTableQuotaCache() {
238    return tableQuotaCache;
239  }
240
241  Map<String, UserQuotaState> getUserQuotaCache() {
242    return userQuotaCache;
243  }
244
245  // TODO: Remove this once we have the notification bus
246  private class QuotaRefresherChore extends ScheduledChore {
247    private long lastUpdate = 0;
248
249    // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
250    // So we cache the results to reduce that load.
251    private final RefreshableExpiringValueCache<ClusterMetrics> tableRegionStatesClusterMetrics;
252    private final RefreshableExpiringValueCache<Integer> regionServersSize;
253
254    public QuotaRefresherChore(Configuration conf, final int period, final Stoppable stoppable) {
255      super("QuotaRefresherChore", stoppable, period);
256
257      Duration tableRegionStatesCacheTtl =
258        Duration.ofMillis(conf.getLong(TABLE_REGION_STATES_CACHE_TTL_MS, period));
259      this.tableRegionStatesClusterMetrics =
260        new RefreshableExpiringValueCache<>("tableRegionStatesClusterMetrics",
261          tableRegionStatesCacheTtl, () -> rsServices.getConnection().getAdmin()
262            .getClusterMetrics(EnumSet.of(Option.SERVERS_NAME, Option.TABLE_TO_REGIONS_COUNT)));
263
264      Duration regionServersSizeCacheTtl =
265        Duration.ofMillis(conf.getLong(REGION_SERVERS_SIZE_CACHE_TTL_MS, period));
266      regionServersSize =
267        new RefreshableExpiringValueCache<>("regionServersSize", regionServersSizeCacheTtl,
268          () -> rsServices.getConnection().getAdmin().getRegionServers().size());
269    }
270
271    @Override
272    public synchronized boolean triggerNow() {
273      tableRegionStatesClusterMetrics.invalidate();
274      regionServersSize.invalidate();
275      return super.triggerNow();
276    }
277
278    @Override
279    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
280        justification = "I do not understand why the complaints, it looks good to me -- FIX")
281    protected void chore() {
282      while (TEST_BLOCK_REFRESH) {
283        LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false");
284        try {
285          Thread.sleep(10);
286        } catch (InterruptedException e) {
287          throw new RuntimeException(e);
288        }
289      }
290      // Prefetch online tables/namespaces
291      for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
292        if (table.isSystemTable()) {
293          continue;
294        }
295        QuotaCache.this.tableQuotaCache.computeIfAbsent(table, key -> new QuotaState());
296
297        final String ns = table.getNamespaceAsString();
298
299        QuotaCache.this.namespaceQuotaCache.computeIfAbsent(ns, key -> new QuotaState());
300      }
301
302      QuotaCache.this.regionServerQuotaCache
303        .computeIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, key -> new QuotaState());
304
305      updateQuotaFactors();
306      fetchNamespaceQuotaState();
307      fetchTableQuotaState();
308      fetchUserQuotaState();
309      fetchRegionServerQuotaState();
310      fetchExceedThrottleQuota();
311      lastUpdate = EnvironmentEdgeManager.currentTime();
312    }
313
314    private void fetchNamespaceQuotaState() {
315      fetch("namespace", QuotaCache.this.namespaceQuotaCache, new Fetcher<String, QuotaState>() {
316        @Override
317        public Get makeGet(final Map.Entry<String, QuotaState> entry) {
318          return QuotaUtil.makeGetForNamespaceQuotas(entry.getKey());
319        }
320
321        @Override
322        public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
323          return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
324            machineQuotaFactor);
325        }
326      });
327    }
328
329    private void fetchTableQuotaState() {
330      fetch("table", QuotaCache.this.tableQuotaCache, new Fetcher<TableName, QuotaState>() {
331        @Override
332        public Get makeGet(final Map.Entry<TableName, QuotaState> entry) {
333          return QuotaUtil.makeGetForTableQuotas(entry.getKey());
334        }
335
336        @Override
337        public Map<TableName, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
338          return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
339            tableMachineQuotaFactors);
340        }
341      });
342    }
343
344    private void fetchUserQuotaState() {
345      final Set<String> namespaces = QuotaCache.this.namespaceQuotaCache.keySet();
346      final Set<TableName> tables = QuotaCache.this.tableQuotaCache.keySet();
347      fetch("user", QuotaCache.this.userQuotaCache, new Fetcher<String, UserQuotaState>() {
348        @Override
349        public Get makeGet(final Map.Entry<String, UserQuotaState> entry) {
350          return QuotaUtil.makeGetForUserQuotas(entry.getKey(), tables, namespaces);
351        }
352
353        @Override
354        public Map<String, UserQuotaState> fetchEntries(final List<Get> gets) throws IOException {
355          return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
356            tableMachineQuotaFactors, machineQuotaFactor);
357        }
358      });
359    }
360
361    private void fetchRegionServerQuotaState() {
362      fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
363        new Fetcher<String, QuotaState>() {
364          @Override
365          public Get makeGet(final Map.Entry<String, QuotaState> entry) {
366            return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
367          }
368
369          @Override
370          public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
371            return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
372          }
373        });
374    }
375
376    private void fetchExceedThrottleQuota() {
377      try {
378        QuotaCache.this.exceedThrottleQuotaEnabled =
379          QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
380      } catch (IOException e) {
381        LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
382      }
383    }
384
385    private <K, V extends QuotaState> void fetch(final String type,
386      final ConcurrentMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
387      long now = EnvironmentEdgeManager.currentTime();
388      long refreshPeriod = getPeriod();
389      long evictPeriod = refreshPeriod * EVICT_PERIOD_FACTOR;
390
391      // Find the quota entries to update
392      List<Get> gets = new ArrayList<>();
393      List<K> toRemove = new ArrayList<>();
394      for (Map.Entry<K, V> entry : quotasMap.entrySet()) {
395        long lastUpdate = entry.getValue().getLastUpdate();
396        long lastQuery = entry.getValue().getLastQuery();
397        if (lastQuery > 0 && (now - lastQuery) >= evictPeriod) {
398          toRemove.add(entry.getKey());
399        } else if (TEST_FORCE_REFRESH || (now - lastUpdate) >= refreshPeriod) {
400          gets.add(fetcher.makeGet(entry));
401        }
402      }
403
404      for (final K key : toRemove) {
405        if (LOG.isTraceEnabled()) {
406          LOG.trace("evict " + type + " key=" + key);
407        }
408        quotasMap.remove(key);
409      }
410
411      // fetch and update the quota entries
412      if (!gets.isEmpty()) {
413        try {
414          for (Map.Entry<K, V> entry : fetcher.fetchEntries(gets).entrySet()) {
415            V quotaInfo = quotasMap.putIfAbsent(entry.getKey(), entry.getValue());
416            if (quotaInfo != null) {
417              quotaInfo.update(entry.getValue());
418            }
419
420            if (LOG.isTraceEnabled()) {
421              LOG.trace("refresh " + type + " key=" + entry.getKey() + " quotas=" + quotaInfo);
422            }
423          }
424        } catch (IOException e) {
425          LOG.warn("Unable to read " + type + " from quota table", e);
426        }
427      }
428    }
429
430    /**
431     * Update quota factors which is used to divide cluster scope quota into machine scope quota For
432     * user/namespace/user over namespace quota, use [1 / RSNum] as machine factor. For table/user
433     * over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum] as machine factor.
434     */
435    private void updateQuotaFactors() {
436      boolean hasTableQuotas = !tableQuotaCache.entrySet().isEmpty()
437        || userQuotaCache.values().stream().anyMatch(UserQuotaState::hasTableLimiters);
438      if (hasTableQuotas) {
439        updateTableMachineQuotaFactors();
440      } else {
441        updateOnlyMachineQuotaFactors();
442      }
443    }
444
445    /**
446     * This method is cheaper than {@link #updateTableMachineQuotaFactors()} and should be used if
447     * we don't have any table quotas in the cache.
448     */
449    private void updateOnlyMachineQuotaFactors() {
450      Optional<Integer> rsSize = regionServersSize.get();
451      if (rsSize.isPresent()) {
452        updateMachineQuotaFactors(rsSize.get());
453      } else {
454        regionServersSize.refresh();
455      }
456    }
457
458    /**
459     * This will call {@link #updateMachineQuotaFactors(int)}, and then update the table machine
460     * factors as well. This relies on a more expensive query for ClusterMetrics.
461     */
462    private void updateTableMachineQuotaFactors() {
463      Optional<ClusterMetrics> clusterMetricsMaybe = tableRegionStatesClusterMetrics.get();
464      if (!clusterMetricsMaybe.isPresent()) {
465        tableRegionStatesClusterMetrics.refresh();
466        return;
467      }
468      ClusterMetrics clusterMetrics = clusterMetricsMaybe.get();
469      updateMachineQuotaFactors(clusterMetrics.getServersName().size());
470
471      Map<TableName, RegionStatesCount> tableRegionStatesCount =
472        clusterMetrics.getTableRegionStatesCount();
473
474      // Update table machine quota factors
475      for (TableName tableName : tableQuotaCache.keySet()) {
476        if (tableRegionStatesCount.containsKey(tableName)) {
477          double factor = 1;
478          try {
479            long regionSize = tableRegionStatesCount.get(tableName).getOpenRegions();
480            if (regionSize == 0) {
481              factor = 0;
482            } else {
483              int localRegionSize = rsServices.getRegions(tableName).size();
484              factor = 1.0 * localRegionSize / regionSize;
485            }
486          } catch (IOException e) {
487            LOG.warn("Get table regions failed: {}", tableName, e);
488          }
489          tableMachineQuotaFactors.put(tableName, factor);
490        } else {
491          // TableName might have already been dropped (outdated)
492          tableMachineQuotaFactors.remove(tableName);
493        }
494      }
495    }
496
497    private void updateMachineQuotaFactors(int rsSize) {
498      if (rsSize != 0) {
499        // TODO if use rs group, the cluster limit should be shared by the rs group
500        machineQuotaFactor = 1.0 / rsSize;
501      }
502    }
503  }
504
505  static class RefreshableExpiringValueCache<T> {
506    private final String name;
507    private final LoadingCache<String, Optional<T>> cache;
508
509    RefreshableExpiringValueCache(String name, Duration refreshPeriod,
510      ThrowingSupplier<T> supplier) {
511      this.name = name;
512      this.cache =
513        CacheBuilder.newBuilder().expireAfterWrite(refreshPeriod.toMillis(), TimeUnit.MILLISECONDS)
514          .build(new CacheLoader<>() {
515            @Override
516            public Optional<T> load(String key) {
517              try {
518                return Optional.of(supplier.get());
519              } catch (Exception e) {
520                LOG.warn("Failed to refresh cache {}", name, e);
521                return Optional.empty();
522              }
523            }
524          });
525    }
526
527    Optional<T> get() {
528      return cache.getUnchecked(name);
529    }
530
531    void refresh() {
532      cache.refresh(name);
533    }
534
535    void invalidate() {
536      cache.invalidate(name);
537    }
538  }
539
540  @FunctionalInterface
541  static interface ThrowingSupplier<T> {
542    T get() throws Exception;
543  }
544
545  static interface Fetcher<Key, Value> {
546    Get makeGet(Map.Entry<Key, Value> entry);
547
548    Map<Key, Value> fetchEntries(List<Get> gets) throws IOException;
549  }
550}