001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Objects;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ScheduledChore;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.RegionInfo;
035import org.apache.hadoop.hbase.client.RegionReplicaUtil;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.master.HMaster;
038import org.apache.hadoop.hbase.master.MetricsMaster;
039import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
050
051/**
052 * Reads the currently received Region filesystem-space use reports and acts on those which violate
053 * a defined quota.
054 */
055@InterfaceAudience.Private
056public class QuotaObserverChore extends ScheduledChore {
057  private static final Logger LOG = LoggerFactory.getLogger(QuotaObserverChore.class);
058  static final String QUOTA_OBSERVER_CHORE_PERIOD_KEY = "hbase.master.quotas.observer.chore.period";
059  static final int QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minutes in millis
060
061  static final String QUOTA_OBSERVER_CHORE_DELAY_KEY = "hbase.master.quotas.observer.chore.delay";
062  static final long QUOTA_OBSERVER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
063
064  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY =
065    "hbase.master.quotas.observer.chore.timeunit";
066  static final String QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
067
068  static final String QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY =
069    "hbase.master.quotas.observer.report.percent";
070  static final double QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT = 0.95;
071
072  static final String REGION_REPORT_RETENTION_DURATION_KEY =
073    "hbase.master.quotas.region.report.retention.millis";
074  static final long REGION_REPORT_RETENTION_DURATION_DEFAULT = 1000 * 60 * 10; // 10 minutes
075
076  private final Connection conn;
077  private final Configuration conf;
078  private final MasterQuotaManager quotaManager;
079  private final MetricsMaster metrics;
080  /*
081   * Callback that changes in quota snapshots are passed to.
082   */
083  private final SpaceQuotaSnapshotNotifier snapshotNotifier;
084
085  /*
086   * Preserves the state of quota snapshots for tables and namespaces
087   */
088  private final Map<TableName, SpaceQuotaSnapshot> tableQuotaSnapshots;
089  private final Map<TableName, SpaceQuotaSnapshot> readOnlyTableQuotaSnapshots;
090  private final Map<String, SpaceQuotaSnapshot> namespaceQuotaSnapshots;
091  private final Map<String, SpaceQuotaSnapshot> readOnlyNamespaceSnapshots;
092
093  // The time, in millis, that region reports should be kept by the master
094  private final long regionReportLifetimeMillis;
095
096  /*
097   * Encapsulates logic for tracking the state of a table/namespace WRT space quotas
098   */
099  private QuotaSnapshotStore<TableName> tableSnapshotStore;
100  private QuotaSnapshotStore<String> namespaceSnapshotStore;
101
102  public QuotaObserverChore(HMaster master, MetricsMaster metrics) {
103    this(master.getConnection(), master.getConfiguration(), master.getSpaceQuotaSnapshotNotifier(),
104      master.getMasterQuotaManager(), master, metrics);
105  }
106
107  QuotaObserverChore(Connection conn, Configuration conf,
108    SpaceQuotaSnapshotNotifier snapshotNotifier, MasterQuotaManager quotaManager, Stoppable stopper,
109    MetricsMaster metrics) {
110    super(QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), getInitialDelay(conf),
111      getTimeUnit(conf));
112    this.conn = conn;
113    this.conf = conf;
114    this.metrics = metrics;
115    this.quotaManager = quotaManager;
116    this.snapshotNotifier = Objects.requireNonNull(snapshotNotifier);
117    this.tableQuotaSnapshots = new ConcurrentHashMap<>();
118    this.readOnlyTableQuotaSnapshots = Collections.unmodifiableMap(tableQuotaSnapshots);
119    this.namespaceQuotaSnapshots = new ConcurrentHashMap<>();
120    this.readOnlyNamespaceSnapshots = Collections.unmodifiableMap(namespaceQuotaSnapshots);
121    this.regionReportLifetimeMillis =
122      conf.getLong(REGION_REPORT_RETENTION_DURATION_KEY, REGION_REPORT_RETENTION_DURATION_DEFAULT);
123  }
124
125  @Override
126  protected void chore() {
127    try {
128      if (LOG.isTraceEnabled()) {
129        LOG.trace("Refreshing space quotas in RegionServer");
130      }
131      long start = System.nanoTime();
132      _chore();
133      if (metrics != null) {
134        metrics.incrementQuotaObserverTime((System.nanoTime() - start) / 1_000_000);
135      }
136    } catch (IOException e) {
137      LOG.warn("Failed to process quota reports and update quota state. Will retry.", e);
138    }
139  }
140
141  void _chore() throws IOException {
142    // Get the total set of tables that have quotas defined. Includes table quotas
143    // and tables included by namespace quotas.
144    TablesWithQuotas tablesWithQuotas = fetchAllTablesWithQuotasDefined();
145    if (LOG.isTraceEnabled()) {
146      LOG.trace("Found following tables with quotas: " + tablesWithQuotas);
147    }
148
149    if (metrics != null) {
150      // Set the number of namespaces and tables with quotas defined
151      metrics.setNumSpaceQuotas(tablesWithQuotas.getTableQuotaTables().size()
152        + tablesWithQuotas.getNamespacesWithQuotas().size());
153    }
154
155    // The current "view" of region space use. Used henceforth.
156    final Map<RegionInfo, Long> reportedRegionSpaceUse = quotaManager.snapshotRegionSizes();
157    if (LOG.isTraceEnabled()) {
158      LOG.trace("Using " + reportedRegionSpaceUse.size() + " region space use reports: "
159        + reportedRegionSpaceUse);
160    }
161
162    // Remove the "old" region reports
163    pruneOldRegionReports();
164
165    // Create the stores to track table and namespace snapshots
166    initializeSnapshotStores(reportedRegionSpaceUse);
167    // Report the number of (non-expired) region size reports
168    if (metrics != null) {
169      metrics.setNumRegionSizeReports(reportedRegionSpaceUse.size());
170    }
171
172    // Filter out tables for which we don't have adequate regionspace reports yet.
173    // Important that we do this after we instantiate the stores above
174    // This gives us a set of Tables which may or may not be violating their quota.
175    // To be safe, we want to make sure that these are not in violation.
176    Set<TableName> tablesInLimbo =
177      tablesWithQuotas.filterInsufficientlyReportedTables(tableSnapshotStore);
178
179    if (LOG.isTraceEnabled()) {
180      LOG.trace("Filtered insufficiently reported tables, left with "
181        + reportedRegionSpaceUse.size() + " regions reported");
182    }
183
184    for (TableName tableInLimbo : tablesInLimbo) {
185      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(tableInLimbo);
186      SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
187      if (currentStatus.isInViolation()) {
188        if (LOG.isTraceEnabled()) {
189          LOG.trace("Moving " + tableInLimbo + " out of violation because fewer region sizes were"
190            + " reported than required.");
191        }
192        SpaceQuotaSnapshot targetSnapshot =
193          new SpaceQuotaSnapshot(SpaceQuotaStatus.notInViolation(), currentSnapshot.getUsage(),
194            currentSnapshot.getLimit());
195        this.snapshotNotifier.transitionTable(tableInLimbo, targetSnapshot);
196        // Update it in the Table QuotaStore so that memory is consistent with no violation.
197        tableSnapshotStore.setCurrentState(tableInLimbo, targetSnapshot);
198        // In case of Disable SVP, we need to enable the table as it moves out of violation
199        if (SpaceViolationPolicy.DISABLE == currentStatus.getPolicy().orElse(null)) {
200          QuotaUtil.enableTableIfNotEnabled(conn, tableInLimbo);
201        }
202      }
203    }
204
205    // Transition each table to/from quota violation based on the current and target state.
206    // Only table quotas are enacted.
207    final Set<TableName> tablesWithTableQuotas = tablesWithQuotas.getTableQuotaTables();
208    processTablesWithQuotas(tablesWithTableQuotas);
209
210    // For each Namespace quota, transition each table in the namespace in or out of violation
211    // only if a table quota violation policy has not already been applied.
212    final Set<String> namespacesWithQuotas = tablesWithQuotas.getNamespacesWithQuotas();
213    final Multimap<String, TableName> tablesByNamespace = tablesWithQuotas.getTablesByNamespace();
214    processNamespacesWithQuotas(namespacesWithQuotas, tablesByNamespace);
215  }
216
217  void initializeSnapshotStores(Map<RegionInfo, Long> regionSizes) {
218    Map<RegionInfo, Long> immutableRegionSpaceUse = Collections.unmodifiableMap(regionSizes);
219    if (tableSnapshotStore == null) {
220      tableSnapshotStore = new TableQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
221    } else {
222      tableSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
223    }
224    if (namespaceSnapshotStore == null) {
225      namespaceSnapshotStore = new NamespaceQuotaSnapshotStore(conn, this, immutableRegionSpaceUse);
226    } else {
227      namespaceSnapshotStore.setRegionUsage(immutableRegionSpaceUse);
228    }
229  }
230
231  /**
232   * Processes each {@code TableName} which has a quota defined and moves it in or out of violation
233   * based on the space use.
234   * @param tablesWithTableQuotas The HBase tables which have quotas defined
235   */
236  void processTablesWithQuotas(final Set<TableName> tablesWithTableQuotas) throws IOException {
237    long numTablesInViolation = 0L;
238    for (TableName table : tablesWithTableQuotas) {
239      final SpaceQuota spaceQuota = tableSnapshotStore.getSpaceQuota(table);
240      if (spaceQuota == null) {
241        if (LOG.isDebugEnabled()) {
242          LOG.debug("Unexpectedly did not find a space quota for " + table
243            + ", maybe it was recently deleted.");
244        }
245        continue;
246      }
247      final SpaceQuotaSnapshot currentSnapshot = tableSnapshotStore.getCurrentState(table);
248      final SpaceQuotaSnapshot targetSnapshot =
249        tableSnapshotStore.getTargetState(table, spaceQuota);
250      if (LOG.isTraceEnabled()) {
251        LOG.trace("Processing " + table + " with current=" + currentSnapshot + ", target="
252          + targetSnapshot);
253      }
254      updateTableQuota(table, currentSnapshot, targetSnapshot);
255
256      if (targetSnapshot.getQuotaStatus().isInViolation()) {
257        numTablesInViolation++;
258      }
259    }
260    // Report the number of tables in violation
261    if (metrics != null) {
262      metrics.setNumTableInSpaceQuotaViolation(numTablesInViolation);
263    }
264  }
265
266  /**
267   * Processes each namespace which has a quota defined and moves all of the tables contained in
268   * that namespace into or out of violation of the quota. Tables which are already in violation of
269   * a quota at the table level which <em>also</em> have a reside in a namespace with a violated
270   * quota will not have the namespace quota enacted. The table quota takes priority over the
271   * namespace quota.
272   * @param namespacesWithQuotas The set of namespaces that have quotas defined
273   * @param tablesByNamespace    A mapping of namespaces and the tables contained in those
274   *                             namespaces
275   */
276  void processNamespacesWithQuotas(final Set<String> namespacesWithQuotas,
277    final Multimap<String, TableName> tablesByNamespace) throws IOException {
278    long numNamespacesInViolation = 0L;
279    for (String namespace : namespacesWithQuotas) {
280      // Get the quota definition for the namespace
281      final SpaceQuota spaceQuota = namespaceSnapshotStore.getSpaceQuota(namespace);
282      if (spaceQuota == null) {
283        if (LOG.isDebugEnabled()) {
284          LOG.debug("Could not get Namespace space quota for " + namespace
285            + ", maybe it was recently deleted.");
286        }
287        continue;
288      }
289      final SpaceQuotaSnapshot currentSnapshot = namespaceSnapshotStore.getCurrentState(namespace);
290      final SpaceQuotaSnapshot targetSnapshot =
291        namespaceSnapshotStore.getTargetState(namespace, spaceQuota);
292      if (LOG.isTraceEnabled()) {
293        LOG.trace("Processing " + namespace + " with current=" + currentSnapshot + ", target="
294          + targetSnapshot);
295      }
296      updateNamespaceQuota(namespace, currentSnapshot, targetSnapshot, tablesByNamespace);
297
298      if (targetSnapshot.getQuotaStatus().isInViolation()) {
299        numNamespacesInViolation++;
300      }
301    }
302
303    // Report the number of namespaces in violation
304    if (metrics != null) {
305      metrics.setNumNamespacesInSpaceQuotaViolation(numNamespacesInViolation);
306    }
307  }
308
309  /**
310   * Updates the hbase:quota table with the new quota policy for this <code>table</code> if
311   * necessary.
312   * @param table           The table being checked
313   * @param currentSnapshot The state of the quota on this table from the previous invocation.
314   * @param targetSnapshot  The state the quota should be in for this table.
315   */
316  void updateTableQuota(TableName table, SpaceQuotaSnapshot currentSnapshot,
317    SpaceQuotaSnapshot targetSnapshot) throws IOException {
318    final SpaceQuotaStatus currentStatus = currentSnapshot.getQuotaStatus();
319    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
320
321    // If we're changing something, log it.
322    if (!currentSnapshot.equals(targetSnapshot)) {
323      this.snapshotNotifier.transitionTable(table, targetSnapshot);
324      // Update it in memory
325      tableSnapshotStore.setCurrentState(table, targetSnapshot);
326
327      // If the target is none, we're moving out of violation. Update the hbase:quota table
328      SpaceViolationPolicy currPolicy = currentStatus.getPolicy().orElse(null);
329      SpaceViolationPolicy targetPolicy = targetStatus.getPolicy().orElse(null);
330      if (!targetStatus.isInViolation()) {
331        // In case of Disable SVP, we need to enable the table as it moves out of violation
332        if (isDisableSpaceViolationPolicy(currPolicy, targetPolicy)) {
333          QuotaUtil.enableTableIfNotEnabled(conn, table);
334        }
335        if (LOG.isDebugEnabled()) {
336          LOG.debug(table + " moved into observance of table space quota.");
337        }
338      } else {
339        // We're either moving into violation or changing violation policies
340        if (currPolicy != targetPolicy && SpaceViolationPolicy.DISABLE == currPolicy) {
341          // In case of policy switch, we need to enable the table if current policy is Disable SVP
342          QuotaUtil.enableTableIfNotEnabled(conn, table);
343        } else if (SpaceViolationPolicy.DISABLE == targetPolicy) {
344          // In case of Disable SVP, we need to disable the table as it moves into violation
345          QuotaUtil.disableTableIfNotDisabled(conn, table);
346        }
347        if (LOG.isDebugEnabled()) {
348          LOG.debug(
349            table + " moved into violation of table space quota with policy of " + targetPolicy);
350        }
351      }
352    } else if (LOG.isTraceEnabled()) {
353      // Policies are the same, so we have nothing to do except log this. Don't need to re-update
354      // the quota table
355      if (!currentStatus.isInViolation()) {
356        LOG.trace(table + " remains in observance of quota.");
357      } else {
358        LOG.trace(table + " remains in violation of quota.");
359      }
360    }
361  }
362
363  /**
364   * Method to check whether we are dealing with DISABLE {@link SpaceViolationPolicy}. In such a
365   * case, currPolicy or/and targetPolicy will be having DISABLE policy.
366   * @param currPolicy   currently set space violation policy
367   * @param targetPolicy new space violation policy
368   * @return true if is DISABLE space violation policy; otherwise false
369   */
370  private boolean isDisableSpaceViolationPolicy(final SpaceViolationPolicy currPolicy,
371    final SpaceViolationPolicy targetPolicy) {
372    return SpaceViolationPolicy.DISABLE == currPolicy
373      || SpaceViolationPolicy.DISABLE == targetPolicy;
374  }
375
376  /**
377   * Updates the hbase:quota table with the target quota policy for this <code>namespace</code> if
378   * necessary.
379   * @param namespace         The namespace being checked
380   * @param currentSnapshot   The state of the quota on this namespace from the previous invocation
381   * @param targetSnapshot    The state the quota should be in for this namespace
382   * @param tablesByNamespace A mapping of tables in namespaces.
383   */
384  void updateNamespaceQuota(String namespace, SpaceQuotaSnapshot currentSnapshot,
385    SpaceQuotaSnapshot targetSnapshot, final Multimap<String, TableName> tablesByNamespace)
386    throws IOException {
387    final SpaceQuotaStatus targetStatus = targetSnapshot.getQuotaStatus();
388
389    // When the policies differ, we need to move into or out of violation
390    if (!currentSnapshot.equals(targetSnapshot)) {
391      // We want to have a policy of "NONE", moving out of violation
392      if (!targetStatus.isInViolation()) {
393        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
394          // If there is a quota on this table in violation
395          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
396            // Table-level quota violation policy is being applied here.
397            if (LOG.isTraceEnabled()) {
398              LOG.trace("Not activating Namespace violation policy because a Table violation"
399                + " policy is already in effect for " + tableInNS);
400            }
401          } else {
402            LOG.info(tableInNS + " moving into observance of namespace space quota");
403            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
404          }
405        }
406        // We want to move into violation at the NS level
407      } else {
408        // Moving tables in the namespace into violation or to a different violation policy
409        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
410          final SpaceQuotaSnapshot tableQuotaSnapshot =
411            tableSnapshotStore.getCurrentState(tableInNS);
412          final boolean hasTableQuota =
413            !Objects.equals(QuotaSnapshotStore.NO_QUOTA, tableQuotaSnapshot);
414          if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
415            // Table-level quota violation policy is being applied here.
416            if (LOG.isTraceEnabled()) {
417              LOG.trace("Not activating Namespace violation policy because a Table violation"
418                + " policy is already in effect for " + tableInNS);
419            }
420          } else {
421            // No table quota present or a table quota present that is not in violation
422            LOG.info(tableInNS + " moving into violation of namespace space quota with policy "
423              + targetStatus.getPolicy());
424            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
425          }
426        }
427      }
428      // Update the new state in memory for this namespace
429      namespaceSnapshotStore.setCurrentState(namespace, targetSnapshot);
430    } else {
431      // Policies are the same
432      if (!targetStatus.isInViolation()) {
433        // Both are NONE, so we remain in observance
434        if (LOG.isTraceEnabled()) {
435          LOG.trace(namespace + " remains in observance of quota.");
436        }
437      } else {
438        // Namespace quota is still in violation, need to enact if the table quota is not
439        // taking priority.
440        for (TableName tableInNS : tablesByNamespace.get(namespace)) {
441          // Does a table policy exist
442          if (tableSnapshotStore.getCurrentState(tableInNS).getQuotaStatus().isInViolation()) {
443            // Table-level quota violation policy is being applied here.
444            if (LOG.isTraceEnabled()) {
445              LOG.trace("Not activating Namespace violation policy because Table violation"
446                + " policy is already in effect for " + tableInNS);
447            }
448          } else {
449            // No table policy, so enact namespace policy
450            LOG.info(tableInNS + " moving into violation of namespace space quota");
451            this.snapshotNotifier.transitionTable(tableInNS, targetSnapshot);
452          }
453        }
454      }
455    }
456  }
457
458  /**
459   * Removes region reports over a certain age.
460   */
461  void pruneOldRegionReports() {
462    final long now = EnvironmentEdgeManager.currentTime();
463    final long pruneTime = now - regionReportLifetimeMillis;
464    final int numRemoved = quotaManager.pruneEntriesOlderThan(pruneTime, this);
465    if (LOG.isTraceEnabled()) {
466      LOG.trace("Removed " + numRemoved + " old region size reports that were older than "
467        + pruneTime + ".");
468    }
469  }
470
471  /**
472   * Computes the set of all tables that have quotas defined. This includes tables with quotas
473   * explicitly set on them, in addition to tables that exist namespaces which have a quota defined.
474   */
475  TablesWithQuotas fetchAllTablesWithQuotasDefined() throws IOException {
476    final Scan scan = QuotaTableUtil.makeScan(null);
477    final TablesWithQuotas tablesWithQuotas = new TablesWithQuotas(conn, conf);
478    try (final QuotaRetriever scanner = new QuotaRetriever(conn, scan)) {
479      for (QuotaSettings quotaSettings : scanner) {
480        // Only one of namespace and tablename should be 'null'
481        final String namespace = quotaSettings.getNamespace();
482        final TableName tableName = quotaSettings.getTableName();
483        if (QuotaType.SPACE != quotaSettings.getQuotaType()) {
484          continue;
485        }
486
487        if (namespace != null) {
488          assert tableName == null;
489          // Collect all of the tables in the namespace
490          TableName[] tablesInNS = conn.getAdmin().listTableNamesByNamespace(namespace);
491          for (TableName tableUnderNs : tablesInNS) {
492            if (LOG.isTraceEnabled()) {
493              LOG.trace(
494                "Adding " + tableUnderNs + " under " + namespace + " as having a namespace quota");
495            }
496            tablesWithQuotas.addNamespaceQuotaTable(tableUnderNs);
497          }
498        } else {
499          assert tableName != null;
500          if (LOG.isTraceEnabled()) {
501            LOG.trace("Adding " + tableName + " as having table quota.");
502          }
503          // namespace is already null, must be a non-null tableName
504          tablesWithQuotas.addTableQuotaTable(tableName);
505        }
506      }
507      return tablesWithQuotas;
508    }
509  }
510
511  QuotaSnapshotStore<TableName> getTableSnapshotStore() {
512    return tableSnapshotStore;
513  }
514
515  QuotaSnapshotStore<String> getNamespaceSnapshotStore() {
516    return namespaceSnapshotStore;
517  }
518
519  /**
520   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects for each HBase
521   * table with a quota defined.
522   */
523  public Map<TableName, SpaceQuotaSnapshot> getTableQuotaSnapshots() {
524    return readOnlyTableQuotaSnapshots;
525  }
526
527  /**
528   * Returns an unmodifiable view over the current {@link SpaceQuotaSnapshot} objects for each HBase
529   * namespace with a quota defined.
530   */
531  public Map<String, SpaceQuotaSnapshot> getNamespaceQuotaSnapshots() {
532    return readOnlyNamespaceSnapshots;
533  }
534
535  /**
536   * Fetches the {@link SpaceQuotaSnapshot} for the given table.
537   */
538  SpaceQuotaSnapshot getTableQuotaSnapshot(TableName table) {
539    SpaceQuotaSnapshot state = this.tableQuotaSnapshots.get(table);
540    if (state == null) {
541      // No tracked state implies observance.
542      return QuotaSnapshotStore.NO_QUOTA;
543    }
544    return state;
545  }
546
547  /**
548   * Stores the quota state for the given table.
549   */
550  void setTableQuotaSnapshot(TableName table, SpaceQuotaSnapshot snapshot) {
551    this.tableQuotaSnapshots.put(table, snapshot);
552  }
553
554  /**
555   * Fetches the {@link SpaceQuotaSnapshot} for the given namespace from this chore.
556   */
557  SpaceQuotaSnapshot getNamespaceQuotaSnapshot(String namespace) {
558    SpaceQuotaSnapshot state = this.namespaceQuotaSnapshots.get(namespace);
559    if (state == null) {
560      // No tracked state implies observance.
561      return QuotaSnapshotStore.NO_QUOTA;
562    }
563    return state;
564  }
565
566  /**
567   * Stores the given {@code snapshot} for the given {@code namespace} in this chore.
568   */
569  void setNamespaceQuotaSnapshot(String namespace, SpaceQuotaSnapshot snapshot) {
570    this.namespaceQuotaSnapshots.put(namespace, snapshot);
571  }
572
573  /**
574   * Extracts the period for the chore from the configuration.
575   * @param conf The configuration object.
576   * @return The configured chore period or the default value in the given timeunit.
577   * @see #getTimeUnit(Configuration)
578   */
579  static int getPeriod(Configuration conf) {
580    return conf.getInt(QUOTA_OBSERVER_CHORE_PERIOD_KEY, QUOTA_OBSERVER_CHORE_PERIOD_DEFAULT);
581  }
582
583  /**
584   * Extracts the initial delay for the chore from the configuration.
585   * @param conf The configuration object.
586   * @return The configured chore initial delay or the default value in the given timeunit.
587   * @see #getTimeUnit(Configuration)
588   */
589  static long getInitialDelay(Configuration conf) {
590    return conf.getLong(QUOTA_OBSERVER_CHORE_DELAY_KEY, QUOTA_OBSERVER_CHORE_DELAY_DEFAULT);
591  }
592
593  /**
594   * Extracts the time unit for the chore period and initial delay from the configuration. The
595   * configuration value for {@link #QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY} must correspond to a
596   * {@link TimeUnit} value.
597   * @param conf The configuration object.
598   * @return The configured time unit for the chore period and initial delay or the default value.
599   */
600  static TimeUnit getTimeUnit(Configuration conf) {
601    return TimeUnit
602      .valueOf(conf.get(QUOTA_OBSERVER_CHORE_TIMEUNIT_KEY, QUOTA_OBSERVER_CHORE_TIMEUNIT_DEFAULT));
603  }
604
605  /**
606   * Extracts the percent of Regions for a table to have been reported to enable quota violation
607   * state change.
608   * @param conf The configuration object.
609   * @return The percent of regions reported to use.
610   */
611  static Double getRegionReportPercent(Configuration conf) {
612    return conf.getDouble(QUOTA_OBSERVER_CHORE_REPORT_PERCENT_KEY,
613      QUOTA_OBSERVER_CHORE_REPORT_PERCENT_DEFAULT);
614  }
615
616  /**
617   * A container which encapsulates the tables that have either a table quota or are contained in a
618   * namespace which have a namespace quota.
619   */
620  static class TablesWithQuotas {
621    private final Set<TableName> tablesWithTableQuotas = new HashSet<>();
622    private final Set<TableName> tablesWithNamespaceQuotas = new HashSet<>();
623    private final Connection conn;
624    private final Configuration conf;
625
626    public TablesWithQuotas(Connection conn, Configuration conf) {
627      this.conn = Objects.requireNonNull(conn);
628      this.conf = Objects.requireNonNull(conf);
629    }
630
631    Configuration getConfiguration() {
632      return conf;
633    }
634
635    /**
636     * Adds a table with a table quota.
637     */
638    public void addTableQuotaTable(TableName tn) {
639      tablesWithTableQuotas.add(tn);
640    }
641
642    /**
643     * Adds a table with a namespace quota.
644     */
645    public void addNamespaceQuotaTable(TableName tn) {
646      tablesWithNamespaceQuotas.add(tn);
647    }
648
649    /**
650     * Returns true if the given table has a table quota.
651     */
652    public boolean hasTableQuota(TableName tn) {
653      return tablesWithTableQuotas.contains(tn);
654    }
655
656    /**
657     * Returns true if the table exists in a namespace with a namespace quota.
658     */
659    public boolean hasNamespaceQuota(TableName tn) {
660      return tablesWithNamespaceQuotas.contains(tn);
661    }
662
663    /**
664     * Returns an unmodifiable view of all tables with table quotas.
665     */
666    public Set<TableName> getTableQuotaTables() {
667      return Collections.unmodifiableSet(tablesWithTableQuotas);
668    }
669
670    /**
671     * Returns an unmodifiable view of all tables in namespaces that have namespace quotas.
672     */
673    public Set<TableName> getNamespaceQuotaTables() {
674      return Collections.unmodifiableSet(tablesWithNamespaceQuotas);
675    }
676
677    public Set<String> getNamespacesWithQuotas() {
678      Set<String> namespaces = new HashSet<>();
679      for (TableName tn : tablesWithNamespaceQuotas) {
680        namespaces.add(tn.getNamespaceAsString());
681      }
682      return namespaces;
683    }
684
685    /**
686     * Returns a view of all tables that reside in a namespace with a namespace quota, grouped by
687     * the namespace itself.
688     */
689    public Multimap<String, TableName> getTablesByNamespace() {
690      Multimap<String, TableName> tablesByNS = HashMultimap.create();
691      for (TableName tn : tablesWithNamespaceQuotas) {
692        tablesByNS.put(tn.getNamespaceAsString(), tn);
693      }
694      return tablesByNS;
695    }
696
697    /**
698     * Filters out all tables for which the Master currently doesn't have enough region space
699     * reports received from RegionServers yet.
700     */
701    public Set<TableName> filterInsufficientlyReportedTables(
702      QuotaSnapshotStore<TableName> tableStore) throws IOException {
703      final double percentRegionsReportedThreshold = getRegionReportPercent(getConfiguration());
704      Set<TableName> tablesToRemove = new HashSet<>();
705      for (TableName table : Iterables.concat(tablesWithTableQuotas, tablesWithNamespaceQuotas)) {
706        // Don't recompute a table we've already computed
707        if (tablesToRemove.contains(table)) {
708          continue;
709        }
710        final int numRegionsInTable = getNumRegions(table);
711        // If the table doesn't exist (no regions), bail out.
712        if (numRegionsInTable == 0) {
713          if (LOG.isTraceEnabled()) {
714            LOG.trace("Filtering " + table + " because no regions were reported");
715          }
716          tablesToRemove.add(table);
717          continue;
718        }
719        final int reportedRegionsInQuota = getNumReportedRegions(table, tableStore);
720        final double ratioReported = ((double) reportedRegionsInQuota) / numRegionsInTable;
721        if (ratioReported < percentRegionsReportedThreshold) {
722          if (LOG.isTraceEnabled()) {
723            LOG.trace("Filtering " + table + " because " + reportedRegionsInQuota + " of "
724              + numRegionsInTable + " regions were reported.");
725          }
726          tablesToRemove.add(table);
727        } else if (LOG.isTraceEnabled()) {
728          LOG.trace("Retaining " + table + " because " + reportedRegionsInQuota + " of "
729            + numRegionsInTable + " regions were reported.");
730        }
731      }
732      for (TableName tableToRemove : tablesToRemove) {
733        tablesWithTableQuotas.remove(tableToRemove);
734        tablesWithNamespaceQuotas.remove(tableToRemove);
735      }
736      return tablesToRemove;
737    }
738
739    /**
740     * Computes the total number of regions in a table.
741     */
742    int getNumRegions(TableName table) throws IOException {
743      List<RegionInfo> regions = this.conn.getAdmin().getRegions(table);
744      if (regions == null) {
745        return 0;
746      }
747      // Filter the region replicas if any and return the original number of regions for a table.
748      RegionReplicaUtil.removeNonDefaultRegions(regions);
749      return regions.size();
750    }
751
752    /**
753     * Computes the number of regions reported for a table.
754     */
755    int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore)
756      throws IOException {
757      return Iterables.size(tableStore.filterBySubject(table));
758    }
759
760    @Override
761    public String toString() {
762      final StringBuilder sb = new StringBuilder(32);
763      sb.append(getClass().getSimpleName()).append(": tablesWithTableQuotas=")
764        .append(this.tablesWithTableQuotas).append(", tablesWithNamespaceQuotas=")
765        .append(this.tablesWithNamespaceQuotas);
766      return sb.toString();
767    }
768  }
769}