001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotDisabledException;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.regionserver.BloomType;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
061
062/**
063 * Helper class to interact with the quota table
064 */
065@InterfaceAudience.Private
066@InterfaceStability.Evolving
067public class QuotaUtil extends QuotaTableUtil {
068  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
069
070  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
071  private static final boolean QUOTA_ENABLED_DEFAULT = false;
072
073  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
074  // the default one read capacity unit is 1024 bytes (1KB)
075  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
076  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
077  // the default one write capacity unit is 1024 bytes (1KB)
078  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
079
080  /*
081   * The below defaults, if configured, will be applied to otherwise unthrottled users. For example,
082   * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure
083   * that any given user may not query more than 1mb per second from any given machine, unless
084   * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and
085   * QuotaScope.MACHINE.
086   */
087  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM =
088    "hbase.quota.default.user.machine.read.num";
089  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE =
090    "hbase.quota.default.user.machine.read.size";
091  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM =
092    "hbase.quota.default.user.machine.request.num";
093  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE =
094    "hbase.quota.default.user.machine.request.size";
095  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM =
096    "hbase.quota.default.user.machine.write.num";
097  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE =
098    "hbase.quota.default.user.machine.write.size";
099  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE =
100    "hbase.quota.default.user.machine.atomic.read.size";
101  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM =
102    "hbase.quota.default.user.machine.atomic.request.num";
103  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE =
104    "hbase.quota.default.user.machine.atomic.write.size";
105
106  /** Table descriptor for Quota internal table */
107  public static final TableDescriptor QUOTA_TABLE_DESC =
108    TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME)
109      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO)
110        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
111        .setMaxVersions(1).build())
112      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE)
113        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
114        .setMaxVersions(1).build())
115      .build();
116
117  /** Returns true if the support for quota is enabled */
118  public static boolean isQuotaEnabled(final Configuration conf) {
119    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
120  }
121
122  /*
123   * ========================================================================= Quota "settings"
124   * helpers
125   */
126  public static void addTableQuota(final Connection connection, final TableName table,
127    final Quotas data) throws IOException {
128    addQuotas(connection, getTableRowKey(table), data);
129  }
130
131  public static void deleteTableQuota(final Connection connection, final TableName table)
132    throws IOException {
133    deleteQuotas(connection, getTableRowKey(table));
134  }
135
136  public static void addNamespaceQuota(final Connection connection, final String namespace,
137    final Quotas data) throws IOException {
138    addQuotas(connection, getNamespaceRowKey(namespace), data);
139  }
140
141  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
142    throws IOException {
143    deleteQuotas(connection, getNamespaceRowKey(namespace));
144  }
145
146  public static void addUserQuota(final Connection connection, final String user, final Quotas data)
147    throws IOException {
148    addQuotas(connection, getUserRowKey(user), data);
149  }
150
151  public static void addUserQuota(final Connection connection, final String user,
152    final TableName table, final Quotas data) throws IOException {
153    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
154  }
155
156  public static void addUserQuota(final Connection connection, final String user,
157    final String namespace, final Quotas data) throws IOException {
158    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
159      data);
160  }
161
162  public static void deleteUserQuota(final Connection connection, final String user)
163    throws IOException {
164    deleteQuotas(connection, getUserRowKey(user));
165  }
166
167  public static void deleteUserQuota(final Connection connection, final String user,
168    final TableName table) throws IOException {
169    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
170  }
171
172  public static void deleteUserQuota(final Connection connection, final String user,
173    final String namespace) throws IOException {
174    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
175  }
176
177  public static void addRegionServerQuota(final Connection connection, final String regionServer,
178    final Quotas data) throws IOException {
179    addQuotas(connection, getRegionServerRowKey(regionServer), data);
180  }
181
182  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
183    throws IOException {
184    deleteQuotas(connection, getRegionServerRowKey(regionServer));
185  }
186
187  public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
188    boolean hasCondition) {
189    if (action.hasMutation()) {
190      return getQuotaOperationType(action.getMutation(), hasCondition);
191    }
192    return OperationQuota.OperationType.GET;
193  }
194
195  public static OperationQuota.OperationType
196    getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
197    return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
198  }
199
200  private static OperationQuota.OperationType
201    getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
202    ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
203    if (
204      hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
205        || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
206    ) {
207      return OperationQuota.OperationType.CHECK_AND_MUTATE;
208    }
209    return OperationQuota.OperationType.MUTATE;
210  }
211
212  protected static void switchExceedThrottleQuota(final Connection connection,
213    boolean exceedThrottleQuotaEnabled) throws IOException {
214    if (exceedThrottleQuotaEnabled) {
215      checkRSQuotaToEnableExceedThrottle(
216        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
217    }
218
219    Put put = new Put(getExceedThrottleQuotaRowKey());
220    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
221      Bytes.toBytes(exceedThrottleQuotaEnabled));
222    doPut(connection, put);
223  }
224
225  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
226    if (quotas != null && quotas.hasThrottle()) {
227      Throttle throttle = quotas.getThrottle();
228      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
229      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
230      boolean hasReadQuota = false;
231      boolean hasWriteQuota = false;
232      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
233        hasReadQuota = true;
234        hasWriteQuota = true;
235      }
236      if (
237        !hasReadQuota
238          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())
239      ) {
240        hasReadQuota = true;
241      }
242      if (!hasReadQuota) {
243        throw new DoNotRetryIOException(
244          "Please set at least one read region server quota before enable exceed throttle quota");
245      }
246      if (
247        !hasWriteQuota
248          && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit())
249      ) {
250        hasWriteQuota = true;
251      }
252      if (!hasWriteQuota) {
253        throw new DoNotRetryIOException("Please set at least one write region server quota "
254          + "before enable exceed throttle quota");
255      }
256      // If enable exceed throttle quota, make sure that region server throttle quotas are in
257      // seconds time unit. Because once previous requests exceed their quota and consume region
258      // server quota, quota in other time units may be refilled in a long time, this may affect
259      // later requests.
260      List<Pair<Boolean, TimedQuota>> list =
261        Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
262          Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
263          Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
264          Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
265          Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
266          Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
267          Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
268          Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
269          Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
270      for (Pair<Boolean, TimedQuota> pair : list) {
271        if (pair.getFirst()) {
272          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
273            throw new DoNotRetryIOException("All region server quota must be "
274              + "in seconds time unit if enable exceed throttle quota");
275          }
276        }
277      }
278    } else {
279      // If enable exceed throttle quota, make sure that region server quota is already set
280      throw new DoNotRetryIOException(
281        "Please set region server quota before enable exceed throttle quota");
282    }
283  }
284
285  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
286    throws IOException {
287    Get get = new Get(getExceedThrottleQuotaRowKey());
288    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
289    Result result = doGet(connection, get);
290    if (result.isEmpty()) {
291      return false;
292    }
293    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
294  }
295
296  private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
297    throws IOException {
298    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
299  }
300
301  private static void addQuotas(final Connection connection, final byte[] rowKey,
302    final byte[] qualifier, final Quotas data) throws IOException {
303    Put put = new Put(rowKey);
304    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
305    doPut(connection, put);
306  }
307
308  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
309    throws IOException {
310    deleteQuotas(connection, rowKey, null);
311  }
312
313  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
314    final byte[] qualifier) throws IOException {
315    Delete delete = new Delete(rowKey);
316    if (qualifier != null) {
317      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
318    }
319    if (isNamespaceRowKey(rowKey)) {
320      String ns = getNamespaceFromRowKey(rowKey);
321      Quotas namespaceQuota = getNamespaceQuota(connection, ns);
322      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
323        // When deleting namespace space quota, also delete table usage(u:p) snapshots
324        deleteTableUsageSnapshotsForNamespace(connection, ns);
325      }
326    }
327    doDelete(connection, delete);
328  }
329
330  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
331    final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor)
332    throws IOException {
333    long nowTs = EnvironmentEdgeManager.currentTime();
334    Result[] results = doGet(connection, gets);
335
336    Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length);
337    for (int i = 0; i < results.length; ++i) {
338      byte[] key = gets.get(i).getRow();
339      assert isUserRowKey(key);
340      String user = getUserFromRowKey(key);
341
342      if (results[i].isEmpty()) {
343        userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration(), nowTs));
344        continue;
345      }
346
347      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
348      userQuotas.put(user, quotaInfo);
349
350      assert Bytes.equals(key, results[i].getRow());
351
352      try {
353        parseUserResult(user, results[i], new UserQuotasVisitor() {
354          @Override
355          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
356            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
357            quotaInfo.setQuotas(namespace, quotas);
358          }
359
360          @Override
361          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
362            quotas = updateClusterQuotaToMachineQuota(quotas,
363              tableMachineQuotaFactors.containsKey(table)
364                ? tableMachineQuotaFactors.get(table)
365                : 1);
366            quotaInfo.setQuotas(table, quotas);
367          }
368
369          @Override
370          public void visitUserQuotas(String userName, Quotas quotas) {
371            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
372            quotaInfo.setQuotas(quotas);
373          }
374        });
375      } catch (IOException e) {
376        LOG.error("Unable to parse user '" + user + "' quotas", e);
377        userQuotas.remove(user);
378      }
379    }
380    return userQuotas;
381  }
382
383  protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf, long nowTs) {
384    QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();
385
386    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
387      .ifPresent(throttleBuilder::setReadNum);
388    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE)
389      .ifPresent(throttleBuilder::setReadSize);
390    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM)
391      .ifPresent(throttleBuilder::setReqNum);
392    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE)
393      .ifPresent(throttleBuilder::setReqSize);
394    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM)
395      .ifPresent(throttleBuilder::setWriteNum);
396    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
397      .ifPresent(throttleBuilder::setWriteSize);
398    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE)
399      .ifPresent(throttleBuilder::setAtomicReadSize);
400    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM)
401      .ifPresent(throttleBuilder::setAtomicReqNum);
402    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE)
403      .ifPresent(throttleBuilder::setAtomicWriteSize);
404
405    UserQuotaState state = new UserQuotaState(nowTs);
406    QuotaProtos.Quotas defaultQuotas =
407      QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
408    state.setQuotas(defaultQuotas);
409    return state;
410  }
411
412  private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) {
413    int defaultSoftLimit = conf.getInt(key, -1);
414    if (defaultSoftLimit == -1) {
415      return Optional.empty();
416    }
417    return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit,
418      java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
419  }
420
421  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
422    final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
423    return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
424      @Override
425      public TableName getKeyFromRow(final byte[] row) {
426        assert isTableRowKey(row);
427        return getTableFromRowKey(row);
428      }
429
430      @Override
431      public double getFactor(TableName tableName) {
432        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
433      }
434    });
435  }
436
437  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
438    final List<Get> gets, double factor) throws IOException {
439    return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
440      @Override
441      public String getKeyFromRow(final byte[] row) {
442        assert isNamespaceRowKey(row);
443        return getNamespaceFromRowKey(row);
444      }
445
446      @Override
447      public double getFactor(String s) {
448        return factor;
449      }
450    });
451  }
452
453  public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection,
454    final List<Get> gets) throws IOException {
455    return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() {
456      @Override
457      public String getKeyFromRow(final byte[] row) {
458        assert isRegionServerRowKey(row);
459        return getRegionServerFromRowKey(row);
460      }
461
462      @Override
463      public double getFactor(String s) {
464        return 1;
465      }
466    });
467  }
468
469  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
470    final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException {
471    long nowTs = EnvironmentEdgeManager.currentTime();
472    Result[] results = doGet(connection, gets);
473
474    Map<K, QuotaState> globalQuotas = new HashMap<>(results.length);
475    for (int i = 0; i < results.length; ++i) {
476      byte[] row = gets.get(i).getRow();
477      K key = kfr.getKeyFromRow(row);
478
479      QuotaState quotaInfo = new QuotaState(nowTs);
480      globalQuotas.put(key, quotaInfo);
481
482      if (results[i].isEmpty()) continue;
483      assert Bytes.equals(row, results[i].getRow());
484
485      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
486      if (data == null) continue;
487
488      try {
489        Quotas quotas = quotasFromData(data);
490        quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
491        quotaInfo.setQuotas(quotas);
492      } catch (IOException e) {
493        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
494        globalQuotas.remove(key);
495      }
496    }
497    return globalQuotas;
498  }
499
500  /**
501   * Convert cluster scope quota to machine scope quota
502   * @param quotas the original quota
503   * @param factor factor used to divide cluster limiter to machine limiter
504   * @return the converted quota whose quota limiters all in machine scope
505   */
506  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
507    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
508    if (newQuotas.hasThrottle()) {
509      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
510      if (throttle.hasReqNum()) {
511        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
512      }
513      if (throttle.hasReqSize()) {
514        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
515      }
516      if (throttle.hasReadNum()) {
517        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
518      }
519      if (throttle.hasReadSize()) {
520        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
521      }
522      if (throttle.hasWriteNum()) {
523        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
524      }
525      if (throttle.hasWriteSize()) {
526        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
527      }
528      if (throttle.hasReqCapacityUnit()) {
529        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
530      }
531      if (throttle.hasReadCapacityUnit()) {
532        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
533      }
534      if (throttle.hasWriteCapacityUnit()) {
535        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
536      }
537      newQuotas.setThrottle(throttle.build());
538    }
539    return newQuotas.build();
540  }
541
542  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
543    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
544      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
545      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
546        .setScope(QuotaScope.MACHINE);
547      return newTimedQuota.build();
548    } else {
549      return timedQuota;
550    }
551  }
552
553  private static interface KeyFromRow<T> {
554    T getKeyFromRow(final byte[] row);
555
556    double getFactor(T t);
557  }
558
559  /*
560   * ========================================================================= HTable helpers
561   */
562  private static void doPut(final Connection connection, final Put put) throws IOException {
563    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
564      table.put(put);
565    }
566  }
567
568  private static void doDelete(final Connection connection, final Delete delete)
569    throws IOException {
570    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
571      table.delete(delete);
572    }
573  }
574
575  /*
576   * ========================================================================= Data Size Helpers
577   */
578  public static long calculateMutationSize(final Mutation mutation) {
579    long size = 0;
580    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
581      for (Cell cell : entry.getValue()) {
582        size += cell.getSerializedSize();
583      }
584    }
585    return size;
586  }
587
588  public static long calculateResultSize(final Result result) {
589    long size = 0;
590    for (Cell cell : result.rawCells()) {
591      size += cell.getSerializedSize();
592    }
593    return size;
594  }
595
596  public static long calculateResultSize(final List<Result> results) {
597    long size = 0;
598    for (Result result : results) {
599      for (Cell cell : result.rawCells()) {
600        size += cell.getSerializedSize();
601      }
602    }
603    return size;
604  }
605
606  public static long calculateCellsSize(final List<Cell> cells) {
607    long size = 0;
608    for (Cell cell : cells) {
609      size += cell.getSerializedSize();
610    }
611    return size;
612  }
613
614  /**
615   * Method to enable a table, if not already enabled. This method suppresses
616   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
617   * the table.
618   * @param conn      connection to re-use
619   * @param tableName name of the table to be enabled
620   */
621  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
622    throws IOException {
623    try {
624      conn.getAdmin().enableTable(tableName);
625    } catch (TableNotDisabledException | TableNotFoundException e) {
626      // ignore
627    }
628  }
629
630  /**
631   * Method to disable a table, if not already disabled. This method suppresses
632   * {@link TableNotEnabledException}, if thrown while disabling the table.
633   * @param conn      connection to re-use
634   * @param tableName table name which has moved into space quota violation
635   */
636  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
637    throws IOException {
638    try {
639      conn.getAdmin().disableTable(tableName);
640    } catch (TableNotEnabledException | TableNotFoundException e) {
641      // ignore
642    }
643  }
644}