001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Iterator;
025import java.util.Map;
026import java.util.Map.Entry;
027import java.util.Optional;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import org.apache.commons.lang3.builder.HashCodeBuilder;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.NamespaceDescriptor;
035import org.apache.hadoop.hbase.RegionStateListener;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.RegionInfo;
039import org.apache.hadoop.hbase.master.MasterServices;
040import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
041import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure;
042import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
043import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.apache.yetus.audience.InterfaceStability;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
051import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
052import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
053
054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize;
065
066/**
067 * Master Quota Manager. It is responsible for initialize the quota table on the first-run and
068 * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be
069 * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the
070 * QuotaScope is CLUSTER.
071 */
072@InterfaceAudience.Private
073@InterfaceStability.Evolving
074public class MasterQuotaManager implements RegionStateListener {
075  private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class);
076  private static final Map<RegionInfo, Long> EMPTY_MAP =
077    Collections.unmodifiableMap(new HashMap<>());
078
079  private final MasterServices masterServices;
080  private NamedLock<String> namespaceLocks;
081  private NamedLock<TableName> tableLocks;
082  private NamedLock<String> userLocks;
083  private NamedLock<String> regionServerLocks;
084  private boolean initialized = false;
085  private NamespaceAuditor namespaceQuotaManager;
086  private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
087  // Storage for quota rpc throttle
088  private RpcThrottleStorage rpcThrottleStorage;
089
090  public MasterQuotaManager(final MasterServices masterServices) {
091    this.masterServices = masterServices;
092  }
093
094  public void start() throws IOException {
095    // If the user doesn't want the quota support skip all the initializations.
096    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
097      LOG.info("Quota support disabled");
098      return;
099    }
100
101    // Create the quota table if missing
102    if (!masterServices.getTableDescriptors().exists(QuotaUtil.QUOTA_TABLE_NAME)) {
103      LOG.info("Quota table not found. Creating...");
104      createQuotaTable();
105    }
106
107    LOG.info("Initializing quota support");
108    namespaceLocks = new NamedLock<>();
109    tableLocks = new NamedLock<>();
110    userLocks = new NamedLock<>();
111    regionServerLocks = new NamedLock<>();
112    regionSizes = new ConcurrentHashMap<>();
113
114    namespaceQuotaManager = new NamespaceAuditor(masterServices);
115    namespaceQuotaManager.start();
116    initialized = true;
117
118    rpcThrottleStorage =
119      new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration());
120  }
121
122  public void stop() {
123  }
124
125  public boolean isQuotaInitialized() {
126    return initialized && namespaceQuotaManager.isInitialized();
127  }
128
129  /*
130   * ========================================================================== Admin operations to
131   * manage the quota table
132   */
133  public SetQuotaResponse setQuota(final SetQuotaRequest req)
134    throws IOException, InterruptedException {
135    checkQuotaSupport();
136
137    if (req.hasUserName()) {
138      userLocks.lock(req.getUserName());
139      try {
140        if (req.hasTableName()) {
141          setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
142        } else if (req.hasNamespace()) {
143          setUserQuota(req.getUserName(), req.getNamespace(), req);
144        } else {
145          setUserQuota(req.getUserName(), req);
146        }
147      } finally {
148        userLocks.unlock(req.getUserName());
149      }
150    } else if (req.hasTableName()) {
151      TableName table = ProtobufUtil.toTableName(req.getTableName());
152      tableLocks.lock(table);
153      try {
154        setTableQuota(table, req);
155      } finally {
156        tableLocks.unlock(table);
157      }
158    } else if (req.hasNamespace()) {
159      namespaceLocks.lock(req.getNamespace());
160      try {
161        setNamespaceQuota(req.getNamespace(), req);
162      } finally {
163        namespaceLocks.unlock(req.getNamespace());
164      }
165    } else if (req.hasRegionServer()) {
166      regionServerLocks.lock(req.getRegionServer());
167      try {
168        setRegionServerQuota(req.getRegionServer(), req);
169      } finally {
170        regionServerLocks.unlock(req.getRegionServer());
171      }
172    } else {
173      throw new DoNotRetryIOException(new UnsupportedOperationException(
174        "a user, a table, a namespace or region server must be specified"));
175    }
176    return SetQuotaResponse.newBuilder().build();
177  }
178
179  public void setUserQuota(final String userName, final SetQuotaRequest req)
180    throws IOException, InterruptedException {
181    setQuota(req, new SetQuotaOperations() {
182      @Override
183      public GlobalQuotaSettingsImpl fetch() throws IOException {
184        return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null,
185          QuotaUtil.getUserQuota(masterServices.getConnection(), userName));
186      }
187
188      @Override
189      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
190        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas());
191      }
192
193      @Override
194      public void delete() throws IOException {
195        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
196      }
197
198      @Override
199      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
200        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo);
201      }
202
203      @Override
204      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
205        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo);
206      }
207    });
208  }
209
210  public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req)
211    throws IOException, InterruptedException {
212    setQuota(req, new SetQuotaOperations() {
213      @Override
214      public GlobalQuotaSettingsImpl fetch() throws IOException {
215        return new GlobalQuotaSettingsImpl(userName, table, null, null,
216          QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table));
217      }
218
219      @Override
220      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
221        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table,
222          quotaPojo.toQuotas());
223      }
224
225      @Override
226      public void delete() throws IOException {
227        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
228      }
229
230      @Override
231      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
232        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo);
233      }
234
235      @Override
236      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
237        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo);
238      }
239    });
240  }
241
242  public void setUserQuota(final String userName, final String namespace, final SetQuotaRequest req)
243    throws IOException, InterruptedException {
244    setQuota(req, new SetQuotaOperations() {
245      @Override
246      public GlobalQuotaSettingsImpl fetch() throws IOException {
247        return new GlobalQuotaSettingsImpl(userName, null, namespace, null,
248          QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace));
249      }
250
251      @Override
252      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
253        QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace,
254          quotaPojo.toQuotas());
255      }
256
257      @Override
258      public void delete() throws IOException {
259        QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
260      }
261
262      @Override
263      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
264        masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotaPojo);
265      }
266
267      @Override
268      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
269        masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotaPojo);
270      }
271    });
272  }
273
274  public void setTableQuota(final TableName table, final SetQuotaRequest req)
275    throws IOException, InterruptedException {
276    setQuota(req, new SetQuotaOperations() {
277      @Override
278      public GlobalQuotaSettingsImpl fetch() throws IOException {
279        return new GlobalQuotaSettingsImpl(null, table, null, null,
280          QuotaUtil.getTableQuota(masterServices.getConnection(), table));
281      }
282
283      @Override
284      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
285        QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas());
286      }
287
288      @Override
289      public void delete() throws IOException {
290        SpaceQuotaSnapshot currSnapshotOfTable =
291          QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table);
292        QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
293        if (currSnapshotOfTable != null) {
294          SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
295          if (
296            SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)
297              && quotaStatus.isInViolation()
298          ) {
299            QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table);
300          }
301        }
302      }
303
304      @Override
305      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
306        masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo);
307      }
308
309      @Override
310      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
311        masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo);
312      }
313    });
314  }
315
316  public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
317    throws IOException, InterruptedException {
318    setQuota(req, new SetQuotaOperations() {
319      @Override
320      public GlobalQuotaSettingsImpl fetch() throws IOException {
321        return new GlobalQuotaSettingsImpl(null, null, namespace, null,
322          QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace));
323      }
324
325      @Override
326      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
327        QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
328          quotaPojo.toQuotas());
329      }
330
331      @Override
332      public void delete() throws IOException {
333        QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
334      }
335
336      @Override
337      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
338        masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo);
339      }
340
341      @Override
342      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
343        masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo);
344      }
345    });
346  }
347
348  public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req)
349    throws IOException, InterruptedException {
350    setQuota(req, new SetQuotaOperations() {
351      @Override
352      public GlobalQuotaSettingsImpl fetch() throws IOException {
353        return new GlobalQuotaSettingsImpl(null, null, null, regionServer,
354          QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer));
355      }
356
357      @Override
358      public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
359        QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
360          quotaPojo.toQuotas());
361      }
362
363      @Override
364      public void delete() throws IOException {
365        QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer);
366      }
367
368      @Override
369      public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
370        masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo);
371      }
372
373      @Override
374      public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
375        masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo);
376      }
377    });
378  }
379
380  public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
381    if (initialized) {
382      this.namespaceQuotaManager.addNamespace(desc);
383    }
384  }
385
386  public void removeNamespaceQuota(String namespace) throws IOException {
387    if (initialized) {
388      this.namespaceQuotaManager.deleteNamespace(namespace);
389    }
390  }
391
392  public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request)
393    throws IOException {
394    boolean rpcThrottle = request.getRpcThrottleEnabled();
395    if (initialized) {
396      masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle);
397      boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled();
398      if (rpcThrottle != oldRpcThrottle) {
399        LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(),
400          oldRpcThrottle, rpcThrottle);
401        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
402        SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage,
403          rpcThrottle, masterServices.getServerName(), latch);
404        masterServices.getMasterProcedureExecutor().submitProcedure(procedure);
405        latch.await();
406      } else {
407        LOG.warn("Skip switch rpc throttle to {} because it's the same with old value",
408          rpcThrottle);
409      }
410      SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder()
411        .setPreviousRpcThrottleEnabled(oldRpcThrottle).build();
412      masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle);
413      return response;
414    } else {
415      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle);
416      return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build();
417    }
418  }
419
420  public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request)
421    throws IOException {
422    if (initialized) {
423      masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled();
424      boolean enabled = isRpcThrottleEnabled();
425      IsRpcThrottleEnabledResponse response =
426        IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build();
427      masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled);
428      return response;
429    } else {
430      LOG.warn("Skip get rpc throttle because rpc quota is disabled");
431      return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build();
432    }
433  }
434
435  public boolean isRpcThrottleEnabled() throws IOException {
436    return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false;
437  }
438
439  public SwitchExceedThrottleQuotaResponse
440    switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException {
441    boolean enabled = request.getExceedThrottleQuotaEnabled();
442    if (initialized) {
443      masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled);
444      boolean previousEnabled =
445        QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection());
446      if (previousEnabled == enabled) {
447        LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value",
448          enabled);
449      } else {
450        QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled);
451        LOG.info("{} switch exceed throttle quota from {} to {}",
452          masterServices.getClientIdAuditPrefix(), previousEnabled, enabled);
453      }
454      SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder()
455        .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build();
456      masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled,
457        enabled);
458      return response;
459    } else {
460      LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled);
461      return SwitchExceedThrottleQuotaResponse.newBuilder()
462        .setPreviousExceedThrottleQuotaEnabled(false).build();
463    }
464  }
465
466  public boolean isExceedThrottleQuotaEnabled() throws IOException {
467    return initialized
468      ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection())
469      : false;
470  }
471
472  private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
473    throws IOException, InterruptedException {
474    if (req.hasRemoveAll() && req.getRemoveAll() == true) {
475      quotaOps.preApply(null);
476      quotaOps.delete();
477      quotaOps.postApply(null);
478      return;
479    }
480
481    // Apply quota changes
482    GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch();
483    if (LOG.isTraceEnabled()) {
484      LOG.trace(
485        "Current quota for request(" + TextFormat.shortDebugString(req) + "): " + currentQuota);
486    }
487    // Call the appropriate "pre" CP hook with the current quota value (may be null)
488    quotaOps.preApply(currentQuota);
489    // Translate the protobuf request back into a POJO
490    QuotaSettings newQuota = QuotaSettings.buildFromProto(req);
491    if (LOG.isTraceEnabled()) {
492      LOG.trace("Deserialized quota from request: " + newQuota);
493    }
494
495    // Merge the current quota settings with the new quota settings the user provided.
496    //
497    // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one
498    // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type.
499    GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota);
500    if (LOG.isTraceEnabled()) {
501      LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota);
502    }
503
504    // Submit new changes
505    if (mergedQuota == null) {
506      quotaOps.delete();
507    } else {
508      quotaOps.update(mergedQuota);
509    }
510    // Advertise the final result via the "post" CP hook
511    quotaOps.postApply(mergedQuota);
512  }
513
514  public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
515    if (initialized) {
516      namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
517    }
518  }
519
520  public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
521    if (initialized) {
522      namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
523    }
524  }
525
526  /** Returns cached region count, or -1 if quota manager is disabled or table status not found */
527  public int getRegionCountOfTable(TableName tName) throws IOException {
528    if (initialized) {
529      return namespaceQuotaManager.getRegionCountOfTable(tName);
530    }
531    return -1;
532  }
533
534  @Override
535  public void onRegionMerged(RegionInfo mergedRegion) throws IOException {
536    if (initialized) {
537      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
538    }
539  }
540
541  @Override
542  public void onRegionSplit(RegionInfo hri) throws IOException {
543    if (initialized) {
544      namespaceQuotaManager.checkQuotaToSplitRegion(hri);
545    }
546  }
547
548  /**
549   * Remove table from namespace quota.
550   * @param tName - The table name to update quota usage.
551   * @throws IOException Signals that an I/O exception has occurred.
552   */
553  public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
554    if (initialized) {
555      namespaceQuotaManager.removeFromNamespaceUsage(tName);
556    }
557  }
558
559  public NamespaceAuditor getNamespaceQuotaManager() {
560    return this.namespaceQuotaManager;
561  }
562
563  /**
564   * Encapsulates CRUD quota operations for some subject.
565   */
566  private static interface SetQuotaOperations {
567    /**
568     * Fetches the current quota settings for the subject.
569     */
570    GlobalQuotaSettingsImpl fetch() throws IOException;
571
572    /**
573     * Deletes the quota for the subject.
574     */
575    void delete() throws IOException;
576
577    /**
578     * Persist the given quota for the subject.
579     */
580    void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
581
582    /**
583     * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current quota
584     * for the subject.
585     */
586    void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
587
588    /**
589     * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting quota
590     * from the request action for the subject.
591     */
592    void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException;
593  }
594
595  /*
596   * ========================================================================== Helpers
597   */
598
599  private void checkQuotaSupport() throws IOException {
600    if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
601      throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled"));
602    }
603    if (!initialized) {
604      long maxWaitTime = masterServices.getConfiguration()
605        .getLong("hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
606      long startTime = EnvironmentEdgeManager.currentTime();
607      do {
608        try {
609          Thread.sleep(100);
610        } catch (InterruptedException e) {
611          LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
612          break;
613        }
614      } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
615      if (!initialized) {
616        throw new IOException("Quota manager is uninitialized, please retry later.");
617      }
618    }
619  }
620
621  private void createQuotaTable() throws IOException {
622    masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
623  }
624
625  private static class NamedLock<T> {
626    private final HashSet<T> locks = new HashSet<>();
627
628    public void lock(final T name) throws InterruptedException {
629      synchronized (locks) {
630        while (locks.contains(name)) {
631          locks.wait();
632        }
633        locks.add(name);
634      }
635    }
636
637    public void unlock(final T name) {
638      synchronized (locks) {
639        locks.remove(name);
640        locks.notifyAll();
641      }
642    }
643  }
644
645  @Override
646  public void onRegionSplitReverted(RegionInfo hri) throws IOException {
647    if (initialized) {
648      this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
649    }
650  }
651
652  /**
653   * Holds the size of a region at the given time, millis since the epoch.
654   */
655  private static class SizeSnapshotWithTimestamp {
656    private final long size;
657    private final long time;
658
659    public SizeSnapshotWithTimestamp(long size, long time) {
660      this.size = size;
661      this.time = time;
662    }
663
664    public long getSize() {
665      return size;
666    }
667
668    public long getTime() {
669      return time;
670    }
671
672    @Override
673    public boolean equals(Object o) {
674      if (o instanceof SizeSnapshotWithTimestamp) {
675        SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
676        return size == other.size && time == other.time;
677      }
678      return false;
679    }
680
681    @Override
682    public int hashCode() {
683      HashCodeBuilder hcb = new HashCodeBuilder();
684      return hcb.append(size).append(time).toHashCode();
685    }
686
687    @Override
688    public String toString() {
689      StringBuilder sb = new StringBuilder(32);
690      sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, ");
691      sb.append("time=").append(time).append("}");
692      return sb.toString();
693    }
694  }
695
696  void initializeRegionSizes() {
697    assert regionSizes == null;
698    this.regionSizes = new ConcurrentHashMap<>();
699  }
700
701  public void addRegionSize(RegionInfo hri, long size, long time) {
702    if (regionSizes == null) {
703      return;
704    }
705    regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time));
706  }
707
708  public Map<RegionInfo, Long> snapshotRegionSizes() {
709    if (regionSizes == null) {
710      return EMPTY_MAP;
711    }
712
713    Map<RegionInfo, Long> copy = new HashMap<>();
714    for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) {
715      copy.put(entry.getKey(), entry.getValue().getSize());
716    }
717    return copy;
718  }
719
720  int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) {
721    if (regionSizes == null) {
722      return 0;
723    }
724    int numEntriesRemoved = 0;
725    Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator =
726      regionSizes.entrySet().iterator();
727    while (iterator.hasNext()) {
728      RegionInfo regionInfo = iterator.next().getKey();
729      long currentEntryTime = regionSizes.get(regionInfo).getTime();
730      // do not prune the entries if table is in violation and
731      // violation policy is disable to avoid cycle of enable/disable.
732      // Please refer HBASE-22012 for more details.
733      // prune entries older than time.
734      if (
735        currentEntryTime < timeToPruneBefore
736          && !isInViolationAndPolicyDisable(regionInfo.getTable(), quotaObserverChore)
737      ) {
738        iterator.remove();
739        numEntriesRemoved++;
740      }
741    }
742    return numEntriesRemoved;
743  }
744
745  /**
746   * Method to check if a table is in violation and policy set on table is DISABLE.
747   * @param tableName          tableName to check.
748   * @param quotaObserverChore QuotaObserverChore instance
749   * @return returns true if table is in violation and policy is disable else false.
750   */
751  private boolean isInViolationAndPolicyDisable(TableName tableName,
752    QuotaObserverChore quotaObserverChore) {
753    boolean isInViolationAtTable = false;
754    boolean isInViolationAtNamespace = false;
755    SpaceViolationPolicy tablePolicy = null;
756    SpaceViolationPolicy namespacePolicy = null;
757    // Get Current Snapshot for the given table
758    SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName);
759    SpaceQuotaSnapshot namespaceQuotaSnapshot =
760      quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString());
761    if (tableQuotaSnapshot != null) {
762      // check if table in violation
763      isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation();
764      Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy();
765      if (policy.isPresent()) {
766        tablePolicy = policy.get();
767      }
768    }
769    if (namespaceQuotaSnapshot != null) {
770      // check namespace in violation
771      isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation();
772      Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy();
773      if (policy.isPresent()) {
774        namespacePolicy = policy.get();
775      }
776    }
777    return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable)
778      || (namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace);
779  }
780
781  /**
782   * Removes each region size entry where the RegionInfo references the provided TableName.
783   * @param tableName tableName.
784   */
785  public void removeRegionSizesForTable(TableName tableName) {
786    regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName));
787  }
788
789  public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn,
790    Configuration conf, FileSystem fs) throws IOException {
791    final HashMultimap<TableName, Entry<String, Long>> archivedFilesByTable = HashMultimap.create();
792    // Group the archived files by table
793    for (FileWithSize fileWithSize : request.getArchivedFilesList()) {
794      TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName());
795      archivedFilesByTable.put(tn,
796        Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize()));
797    }
798    if (LOG.isTraceEnabled()) {
799      LOG.trace("Grouped archived files by table: " + archivedFilesByTable);
800    }
801    // Report each set of files to the appropriate object
802    for (TableName tn : archivedFilesByTable.keySet()) {
803      final Set<Entry<String, Long>> filesWithSize = archivedFilesByTable.get(tn);
804      final FileArchiverNotifier notifier =
805        FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn);
806      notifier.addArchivedFiles(filesWithSize);
807    }
808  }
809}