001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.DoNotRetryIOException; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.TableNotDisabledException; 032import org.apache.hadoop.hbase.TableNotEnabledException; 033import org.apache.hadoop.hbase.TableNotFoundException; 034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Mutation; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.client.TableDescriptor; 043import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 044import org.apache.hadoop.hbase.regionserver.BloomType; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 047import org.apache.hadoop.hbase.util.Pair; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.yetus.audience.InterfaceStability; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota; 061 062/** 063 * Helper class to interact with the quota table 064 */ 065@InterfaceAudience.Private 066@InterfaceStability.Evolving 067public class QuotaUtil extends QuotaTableUtil { 068 private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class); 069 070 public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; 071 private static final boolean QUOTA_ENABLED_DEFAULT = false; 072 073 public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; 074 // the default one read capacity unit is 1024 bytes (1KB) 075 public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; 076 public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; 077 // the default one write capacity unit is 1024 bytes (1KB) 078 public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; 079 080 /* 081 * The below defaults, if configured, will be applied to otherwise unthrottled users. For example, 082 * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure 083 * that any given user may not query more than 1mb per second from any given machine, unless 084 * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and 085 * QuotaScope.MACHINE. 086 */ 087 public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM = 088 "hbase.quota.default.user.machine.read.num"; 089 public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE = 090 "hbase.quota.default.user.machine.read.size"; 091 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM = 092 "hbase.quota.default.user.machine.request.num"; 093 public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE = 094 "hbase.quota.default.user.machine.request.size"; 095 public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM = 096 "hbase.quota.default.user.machine.write.num"; 097 public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE = 098 "hbase.quota.default.user.machine.write.size"; 099 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE = 100 "hbase.quota.default.user.machine.atomic.read.size"; 101 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM = 102 "hbase.quota.default.user.machine.atomic.request.num"; 103 public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE = 104 "hbase.quota.default.user.machine.atomic.write.size"; 105 106 /** Table descriptor for Quota internal table */ 107 public static final TableDescriptor QUOTA_TABLE_DESC = 108 TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME) 109 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO) 110 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 111 .setMaxVersions(1).build()) 112 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE) 113 .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW) 114 .setMaxVersions(1).build()) 115 .build(); 116 117 /** Returns true if the support for quota is enabled */ 118 public static boolean isQuotaEnabled(final Configuration conf) { 119 return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT); 120 } 121 122 /* 123 * ========================================================================= Quota "settings" 124 * helpers 125 */ 126 public static void addTableQuota(final Connection connection, final TableName table, 127 final Quotas data) throws IOException { 128 addQuotas(connection, getTableRowKey(table), data); 129 } 130 131 public static void deleteTableQuota(final Connection connection, final TableName table) 132 throws IOException { 133 deleteQuotas(connection, getTableRowKey(table)); 134 } 135 136 public static void addNamespaceQuota(final Connection connection, final String namespace, 137 final Quotas data) throws IOException { 138 addQuotas(connection, getNamespaceRowKey(namespace), data); 139 } 140 141 public static void deleteNamespaceQuota(final Connection connection, final String namespace) 142 throws IOException { 143 deleteQuotas(connection, getNamespaceRowKey(namespace)); 144 } 145 146 public static void addUserQuota(final Connection connection, final String user, final Quotas data) 147 throws IOException { 148 addQuotas(connection, getUserRowKey(user), data); 149 } 150 151 public static void addUserQuota(final Connection connection, final String user, 152 final TableName table, final Quotas data) throws IOException { 153 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data); 154 } 155 156 public static void addUserQuota(final Connection connection, final String user, 157 final String namespace, final Quotas data) throws IOException { 158 addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace), 159 data); 160 } 161 162 public static void deleteUserQuota(final Connection connection, final String user) 163 throws IOException { 164 deleteQuotas(connection, getUserRowKey(user)); 165 } 166 167 public static void deleteUserQuota(final Connection connection, final String user, 168 final TableName table) throws IOException { 169 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table)); 170 } 171 172 public static void deleteUserQuota(final Connection connection, final String user, 173 final String namespace) throws IOException { 174 deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace)); 175 } 176 177 public static void addRegionServerQuota(final Connection connection, final String regionServer, 178 final Quotas data) throws IOException { 179 addQuotas(connection, getRegionServerRowKey(regionServer), data); 180 } 181 182 public static void deleteRegionServerQuota(final Connection connection, final String regionServer) 183 throws IOException { 184 deleteQuotas(connection, getRegionServerRowKey(regionServer)); 185 } 186 187 public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action, 188 boolean hasCondition) { 189 if (action.hasMutation()) { 190 return getQuotaOperationType(action.getMutation(), hasCondition); 191 } 192 return OperationQuota.OperationType.GET; 193 } 194 195 public static OperationQuota.OperationType 196 getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) { 197 return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition()); 198 } 199 200 private static OperationQuota.OperationType 201 getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) { 202 ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType(); 203 if ( 204 hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND 205 || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT 206 ) { 207 return OperationQuota.OperationType.CHECK_AND_MUTATE; 208 } 209 return OperationQuota.OperationType.MUTATE; 210 } 211 212 protected static void switchExceedThrottleQuota(final Connection connection, 213 boolean exceedThrottleQuotaEnabled) throws IOException { 214 if (exceedThrottleQuotaEnabled) { 215 checkRSQuotaToEnableExceedThrottle( 216 getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)); 217 } 218 219 Put put = new Put(getExceedThrottleQuotaRowKey()); 220 put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS, 221 Bytes.toBytes(exceedThrottleQuotaEnabled)); 222 doPut(connection, put); 223 } 224 225 private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException { 226 if (quotas != null && quotas.hasThrottle()) { 227 Throttle throttle = quotas.getThrottle(); 228 // If enable exceed throttle quota, make sure that there are at least one read(req/read + 229 // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas. 230 boolean hasReadQuota = false; 231 boolean hasWriteQuota = false; 232 if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) { 233 hasReadQuota = true; 234 hasWriteQuota = true; 235 } 236 if ( 237 !hasReadQuota 238 && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit()) 239 ) { 240 hasReadQuota = true; 241 } 242 if (!hasReadQuota) { 243 throw new DoNotRetryIOException( 244 "Please set at least one read region server quota before enable exceed throttle quota"); 245 } 246 if ( 247 !hasWriteQuota 248 && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit()) 249 ) { 250 hasWriteQuota = true; 251 } 252 if (!hasWriteQuota) { 253 throw new DoNotRetryIOException("Please set at least one write region server quota " 254 + "before enable exceed throttle quota"); 255 } 256 // If enable exceed throttle quota, make sure that region server throttle quotas are in 257 // seconds time unit. Because once previous requests exceed their quota and consume region 258 // server quota, quota in other time units may be refilled in a long time, this may affect 259 // later requests. 260 List<Pair<Boolean, TimedQuota>> list = 261 Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()), 262 Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()), 263 Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()), 264 Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()), 265 Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()), 266 Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()), 267 Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()), 268 Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()), 269 Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit())); 270 for (Pair<Boolean, TimedQuota> pair : list) { 271 if (pair.getFirst()) { 272 if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) { 273 throw new DoNotRetryIOException("All region server quota must be " 274 + "in seconds time unit if enable exceed throttle quota"); 275 } 276 } 277 } 278 } else { 279 // If enable exceed throttle quota, make sure that region server quota is already set 280 throw new DoNotRetryIOException( 281 "Please set region server quota before enable exceed throttle quota"); 282 } 283 } 284 285 protected static boolean isExceedThrottleQuotaEnabled(final Connection connection) 286 throws IOException { 287 Get get = new Get(getExceedThrottleQuotaRowKey()); 288 get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 289 Result result = doGet(connection, get); 290 if (result.isEmpty()) { 291 return false; 292 } 293 return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS)); 294 } 295 296 private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data) 297 throws IOException { 298 addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data); 299 } 300 301 private static void addQuotas(final Connection connection, final byte[] rowKey, 302 final byte[] qualifier, final Quotas data) throws IOException { 303 Put put = new Put(rowKey); 304 put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data)); 305 doPut(connection, put); 306 } 307 308 private static void deleteQuotas(final Connection connection, final byte[] rowKey) 309 throws IOException { 310 deleteQuotas(connection, rowKey, null); 311 } 312 313 private static void deleteQuotas(final Connection connection, final byte[] rowKey, 314 final byte[] qualifier) throws IOException { 315 Delete delete = new Delete(rowKey); 316 if (qualifier != null) { 317 delete.addColumns(QUOTA_FAMILY_INFO, qualifier); 318 } 319 if (isNamespaceRowKey(rowKey)) { 320 String ns = getNamespaceFromRowKey(rowKey); 321 Quotas namespaceQuota = getNamespaceQuota(connection, ns); 322 if (namespaceQuota != null && namespaceQuota.hasSpace()) { 323 // When deleting namespace space quota, also delete table usage(u:p) snapshots 324 deleteTableUsageSnapshotsForNamespace(connection, ns); 325 } 326 } 327 doDelete(connection, delete); 328 } 329 330 public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection, 331 final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor) 332 throws IOException { 333 long nowTs = EnvironmentEdgeManager.currentTime(); 334 Result[] results = doGet(connection, gets); 335 336 Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length); 337 for (int i = 0; i < results.length; ++i) { 338 byte[] key = gets.get(i).getRow(); 339 assert isUserRowKey(key); 340 String user = getUserFromRowKey(key); 341 342 if (results[i].isEmpty()) { 343 userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration(), nowTs)); 344 continue; 345 } 346 347 final UserQuotaState quotaInfo = new UserQuotaState(nowTs); 348 userQuotas.put(user, quotaInfo); 349 350 assert Bytes.equals(key, results[i].getRow()); 351 352 try { 353 parseUserResult(user, results[i], new UserQuotasVisitor() { 354 @Override 355 public void visitUserQuotas(String userName, String namespace, Quotas quotas) { 356 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 357 quotaInfo.setQuotas(namespace, quotas); 358 } 359 360 @Override 361 public void visitUserQuotas(String userName, TableName table, Quotas quotas) { 362 quotas = updateClusterQuotaToMachineQuota(quotas, 363 tableMachineQuotaFactors.containsKey(table) 364 ? tableMachineQuotaFactors.get(table) 365 : 1); 366 quotaInfo.setQuotas(table, quotas); 367 } 368 369 @Override 370 public void visitUserQuotas(String userName, Quotas quotas) { 371 quotas = updateClusterQuotaToMachineQuota(quotas, factor); 372 quotaInfo.setQuotas(quotas); 373 } 374 }); 375 } catch (IOException e) { 376 LOG.error("Unable to parse user '" + user + "' quotas", e); 377 userQuotas.remove(user); 378 } 379 } 380 return userQuotas; 381 } 382 383 protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf, long nowTs) { 384 QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder(); 385 386 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM) 387 .ifPresent(throttleBuilder::setReadNum); 388 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE) 389 .ifPresent(throttleBuilder::setReadSize); 390 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM) 391 .ifPresent(throttleBuilder::setReqNum); 392 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE) 393 .ifPresent(throttleBuilder::setReqSize); 394 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM) 395 .ifPresent(throttleBuilder::setWriteNum); 396 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE) 397 .ifPresent(throttleBuilder::setWriteSize); 398 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE) 399 .ifPresent(throttleBuilder::setAtomicReadSize); 400 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM) 401 .ifPresent(throttleBuilder::setAtomicReqNum); 402 buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE) 403 .ifPresent(throttleBuilder::setAtomicWriteSize); 404 405 UserQuotaState state = new UserQuotaState(nowTs); 406 QuotaProtos.Quotas defaultQuotas = 407 QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build(); 408 state.setQuotas(defaultQuotas); 409 return state; 410 } 411 412 private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) { 413 int defaultSoftLimit = conf.getInt(key, -1); 414 if (defaultSoftLimit == -1) { 415 return Optional.empty(); 416 } 417 return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit, 418 java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE)); 419 } 420 421 public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection, 422 final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException { 423 return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() { 424 @Override 425 public TableName getKeyFromRow(final byte[] row) { 426 assert isTableRowKey(row); 427 return getTableFromRowKey(row); 428 } 429 430 @Override 431 public double getFactor(TableName tableName) { 432 return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1; 433 } 434 }); 435 } 436 437 public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection, 438 final List<Get> gets, double factor) throws IOException { 439 return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() { 440 @Override 441 public String getKeyFromRow(final byte[] row) { 442 assert isNamespaceRowKey(row); 443 return getNamespaceFromRowKey(row); 444 } 445 446 @Override 447 public double getFactor(String s) { 448 return factor; 449 } 450 }); 451 } 452 453 public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection, 454 final List<Get> gets) throws IOException { 455 return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() { 456 @Override 457 public String getKeyFromRow(final byte[] row) { 458 assert isRegionServerRowKey(row); 459 return getRegionServerFromRowKey(row); 460 } 461 462 @Override 463 public double getFactor(String s) { 464 return 1; 465 } 466 }); 467 } 468 469 public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type, 470 final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException { 471 long nowTs = EnvironmentEdgeManager.currentTime(); 472 Result[] results = doGet(connection, gets); 473 474 Map<K, QuotaState> globalQuotas = new HashMap<>(results.length); 475 for (int i = 0; i < results.length; ++i) { 476 byte[] row = gets.get(i).getRow(); 477 K key = kfr.getKeyFromRow(row); 478 479 QuotaState quotaInfo = new QuotaState(nowTs); 480 globalQuotas.put(key, quotaInfo); 481 482 if (results[i].isEmpty()) continue; 483 assert Bytes.equals(row, results[i].getRow()); 484 485 byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS); 486 if (data == null) continue; 487 488 try { 489 Quotas quotas = quotasFromData(data); 490 quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key)); 491 quotaInfo.setQuotas(quotas); 492 } catch (IOException e) { 493 LOG.error("Unable to parse " + type + " '" + key + "' quotas", e); 494 globalQuotas.remove(key); 495 } 496 } 497 return globalQuotas; 498 } 499 500 /** 501 * Convert cluster scope quota to machine scope quota 502 * @param quotas the original quota 503 * @param factor factor used to divide cluster limiter to machine limiter 504 * @return the converted quota whose quota limiters all in machine scope 505 */ 506 private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) { 507 Quotas.Builder newQuotas = Quotas.newBuilder(quotas); 508 if (newQuotas.hasThrottle()) { 509 Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle()); 510 if (throttle.hasReqNum()) { 511 throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor)); 512 } 513 if (throttle.hasReqSize()) { 514 throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor)); 515 } 516 if (throttle.hasReadNum()) { 517 throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor)); 518 } 519 if (throttle.hasReadSize()) { 520 throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor)); 521 } 522 if (throttle.hasWriteNum()) { 523 throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor)); 524 } 525 if (throttle.hasWriteSize()) { 526 throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor)); 527 } 528 if (throttle.hasReqCapacityUnit()) { 529 throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor)); 530 } 531 if (throttle.hasReadCapacityUnit()) { 532 throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor)); 533 } 534 if (throttle.hasWriteCapacityUnit()) { 535 throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor)); 536 } 537 newQuotas.setThrottle(throttle.build()); 538 } 539 return newQuotas.build(); 540 } 541 542 private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) { 543 if (timedQuota.getScope() == QuotaScope.CLUSTER) { 544 TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota); 545 newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor))) 546 .setScope(QuotaScope.MACHINE); 547 return newTimedQuota.build(); 548 } else { 549 return timedQuota; 550 } 551 } 552 553 private static interface KeyFromRow<T> { 554 T getKeyFromRow(final byte[] row); 555 556 double getFactor(T t); 557 } 558 559 /* 560 * ========================================================================= HTable helpers 561 */ 562 private static void doPut(final Connection connection, final Put put) throws IOException { 563 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 564 table.put(put); 565 } 566 } 567 568 private static void doDelete(final Connection connection, final Delete delete) 569 throws IOException { 570 try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 571 table.delete(delete); 572 } 573 } 574 575 /* 576 * ========================================================================= Data Size Helpers 577 */ 578 public static long calculateMutationSize(final Mutation mutation) { 579 long size = 0; 580 for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) { 581 for (Cell cell : entry.getValue()) { 582 size += cell.getSerializedSize(); 583 } 584 } 585 return size; 586 } 587 588 public static long calculateResultSize(final Result result) { 589 long size = 0; 590 for (Cell cell : result.rawCells()) { 591 size += cell.getSerializedSize(); 592 } 593 return size; 594 } 595 596 public static long calculateResultSize(final List<Result> results) { 597 long size = 0; 598 for (Result result : results) { 599 for (Cell cell : result.rawCells()) { 600 size += cell.getSerializedSize(); 601 } 602 } 603 return size; 604 } 605 606 public static long calculateCellsSize(final List<Cell> cells) { 607 long size = 0; 608 for (Cell cell : cells) { 609 size += cell.getSerializedSize(); 610 } 611 return size; 612 } 613 614 /** 615 * Method to enable a table, if not already enabled. This method suppresses 616 * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling 617 * the table. 618 * @param conn connection to re-use 619 * @param tableName name of the table to be enabled 620 */ 621 public static void enableTableIfNotEnabled(Connection conn, TableName tableName) 622 throws IOException { 623 try { 624 conn.getAdmin().enableTable(tableName); 625 } catch (TableNotDisabledException | TableNotFoundException e) { 626 // ignore 627 } 628 } 629 630 /** 631 * Method to disable a table, if not already disabled. This method suppresses 632 * {@link TableNotEnabledException}, if thrown while disabling the table. 633 * @param conn connection to re-use 634 * @param tableName table name which has moved into space quota violation 635 */ 636 public static void disableTableIfNotDisabled(Connection conn, TableName tableName) 637 throws IOException { 638 try { 639 conn.getAdmin().disableTable(tableName); 640 } catch (TableNotEnabledException | TableNotFoundException e) { 641 // ignore 642 } 643 } 644}