001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Iterator; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.Optional; 028import java.util.Set; 029import java.util.concurrent.ConcurrentHashMap; 030import org.apache.commons.lang3.builder.HashCodeBuilder; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.NamespaceDescriptor; 035import org.apache.hadoop.hbase.RegionStateListener; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.master.MasterServices; 040import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; 041import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure; 042import org.apache.hadoop.hbase.namespace.NamespaceAuditor; 043import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.apache.yetus.audience.InterfaceStability; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 051import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 052import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 053 054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; 065 066/** 067 * Master Quota Manager. It is responsible for initialize the quota table on the first-run and 068 * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be 069 * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the 070 * QuotaScope is CLUSTER. 071 */ 072@InterfaceAudience.Private 073@InterfaceStability.Evolving 074public class MasterQuotaManager implements RegionStateListener { 075 private static final Logger LOG = LoggerFactory.getLogger(MasterQuotaManager.class); 076 private static final Map<RegionInfo, Long> EMPTY_MAP = 077 Collections.unmodifiableMap(new HashMap<>()); 078 079 private final MasterServices masterServices; 080 private NamedLock<String> namespaceLocks; 081 private NamedLock<TableName> tableLocks; 082 private NamedLock<String> userLocks; 083 private NamedLock<String> regionServerLocks; 084 private boolean initialized = false; 085 private NamespaceAuditor namespaceQuotaManager; 086 private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes; 087 // Storage for quota rpc throttle 088 private RpcThrottleStorage rpcThrottleStorage; 089 090 public MasterQuotaManager(final MasterServices masterServices) { 091 this.masterServices = masterServices; 092 } 093 094 public void start() throws IOException { 095 // If the user doesn't want the quota support skip all the initializations. 096 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 097 LOG.info("Quota support disabled"); 098 return; 099 } 100 101 // Create the quota table if missing 102 if (!masterServices.getTableDescriptors().exists(QuotaUtil.QUOTA_TABLE_NAME)) { 103 LOG.info("Quota table not found. Creating..."); 104 createQuotaTable(); 105 } 106 107 LOG.info("Initializing quota support"); 108 namespaceLocks = new NamedLock<>(); 109 tableLocks = new NamedLock<>(); 110 userLocks = new NamedLock<>(); 111 regionServerLocks = new NamedLock<>(); 112 regionSizes = new ConcurrentHashMap<>(); 113 114 namespaceQuotaManager = new NamespaceAuditor(masterServices); 115 namespaceQuotaManager.start(); 116 initialized = true; 117 118 rpcThrottleStorage = 119 new RpcThrottleStorage(masterServices.getZooKeeper(), masterServices.getConfiguration()); 120 } 121 122 public void stop() { 123 } 124 125 public boolean isQuotaInitialized() { 126 return initialized && namespaceQuotaManager.isInitialized(); 127 } 128 129 /* 130 * ========================================================================== Admin operations to 131 * manage the quota table 132 */ 133 public SetQuotaResponse setQuota(final SetQuotaRequest req) 134 throws IOException, InterruptedException { 135 checkQuotaSupport(); 136 137 if (req.hasUserName()) { 138 userLocks.lock(req.getUserName()); 139 try { 140 if (req.hasTableName()) { 141 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req); 142 } else if (req.hasNamespace()) { 143 setUserQuota(req.getUserName(), req.getNamespace(), req); 144 } else { 145 setUserQuota(req.getUserName(), req); 146 } 147 } finally { 148 userLocks.unlock(req.getUserName()); 149 } 150 } else if (req.hasTableName()) { 151 TableName table = ProtobufUtil.toTableName(req.getTableName()); 152 tableLocks.lock(table); 153 try { 154 setTableQuota(table, req); 155 } finally { 156 tableLocks.unlock(table); 157 } 158 } else if (req.hasNamespace()) { 159 namespaceLocks.lock(req.getNamespace()); 160 try { 161 setNamespaceQuota(req.getNamespace(), req); 162 } finally { 163 namespaceLocks.unlock(req.getNamespace()); 164 } 165 } else if (req.hasRegionServer()) { 166 regionServerLocks.lock(req.getRegionServer()); 167 try { 168 setRegionServerQuota(req.getRegionServer(), req); 169 } finally { 170 regionServerLocks.unlock(req.getRegionServer()); 171 } 172 } else { 173 throw new DoNotRetryIOException(new UnsupportedOperationException( 174 "a user, a table, a namespace or region server must be specified")); 175 } 176 return SetQuotaResponse.newBuilder().build(); 177 } 178 179 public void setUserQuota(final String userName, final SetQuotaRequest req) 180 throws IOException, InterruptedException { 181 setQuota(req, new SetQuotaOperations() { 182 @Override 183 public GlobalQuotaSettingsImpl fetch() throws IOException { 184 return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null, 185 QuotaUtil.getUserQuota(masterServices.getConnection(), userName)); 186 } 187 188 @Override 189 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 190 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotaPojo.toQuotas()); 191 } 192 193 @Override 194 public void delete() throws IOException { 195 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName); 196 } 197 198 @Override 199 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 200 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotaPojo); 201 } 202 203 @Override 204 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 205 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotaPojo); 206 } 207 }); 208 } 209 210 public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req) 211 throws IOException, InterruptedException { 212 setQuota(req, new SetQuotaOperations() { 213 @Override 214 public GlobalQuotaSettingsImpl fetch() throws IOException { 215 return new GlobalQuotaSettingsImpl(userName, table, null, null, 216 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table)); 217 } 218 219 @Override 220 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 221 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, 222 quotaPojo.toQuotas()); 223 } 224 225 @Override 226 public void delete() throws IOException { 227 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table); 228 } 229 230 @Override 231 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 232 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotaPojo); 233 } 234 235 @Override 236 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 237 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotaPojo); 238 } 239 }); 240 } 241 242 public void setUserQuota(final String userName, final String namespace, final SetQuotaRequest req) 243 throws IOException, InterruptedException { 244 setQuota(req, new SetQuotaOperations() { 245 @Override 246 public GlobalQuotaSettingsImpl fetch() throws IOException { 247 return new GlobalQuotaSettingsImpl(userName, null, namespace, null, 248 QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace)); 249 } 250 251 @Override 252 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 253 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, 254 quotaPojo.toQuotas()); 255 } 256 257 @Override 258 public void delete() throws IOException { 259 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace); 260 } 261 262 @Override 263 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 264 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotaPojo); 265 } 266 267 @Override 268 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 269 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotaPojo); 270 } 271 }); 272 } 273 274 public void setTableQuota(final TableName table, final SetQuotaRequest req) 275 throws IOException, InterruptedException { 276 setQuota(req, new SetQuotaOperations() { 277 @Override 278 public GlobalQuotaSettingsImpl fetch() throws IOException { 279 return new GlobalQuotaSettingsImpl(null, table, null, null, 280 QuotaUtil.getTableQuota(masterServices.getConnection(), table)); 281 } 282 283 @Override 284 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 285 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotaPojo.toQuotas()); 286 } 287 288 @Override 289 public void delete() throws IOException { 290 SpaceQuotaSnapshot currSnapshotOfTable = 291 QuotaTableUtil.getCurrentSnapshotFromQuotaTable(masterServices.getConnection(), table); 292 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table); 293 if (currSnapshotOfTable != null) { 294 SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus(); 295 if ( 296 SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null) 297 && quotaStatus.isInViolation() 298 ) { 299 QuotaUtil.enableTableIfNotEnabled(masterServices.getConnection(), table); 300 } 301 } 302 } 303 304 @Override 305 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 306 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotaPojo); 307 } 308 309 @Override 310 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 311 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotaPojo); 312 } 313 }); 314 } 315 316 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req) 317 throws IOException, InterruptedException { 318 setQuota(req, new SetQuotaOperations() { 319 @Override 320 public GlobalQuotaSettingsImpl fetch() throws IOException { 321 return new GlobalQuotaSettingsImpl(null, null, namespace, null, 322 QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace)); 323 } 324 325 @Override 326 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 327 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, 328 quotaPojo.toQuotas()); 329 } 330 331 @Override 332 public void delete() throws IOException { 333 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace); 334 } 335 336 @Override 337 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 338 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotaPojo); 339 } 340 341 @Override 342 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 343 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotaPojo); 344 } 345 }); 346 } 347 348 public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req) 349 throws IOException, InterruptedException { 350 setQuota(req, new SetQuotaOperations() { 351 @Override 352 public GlobalQuotaSettingsImpl fetch() throws IOException { 353 return new GlobalQuotaSettingsImpl(null, null, null, regionServer, 354 QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer)); 355 } 356 357 @Override 358 public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 359 QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer, 360 quotaPojo.toQuotas()); 361 } 362 363 @Override 364 public void delete() throws IOException { 365 QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer); 366 } 367 368 @Override 369 public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 370 masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo); 371 } 372 373 @Override 374 public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException { 375 masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo); 376 } 377 }); 378 } 379 380 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException { 381 if (initialized) { 382 this.namespaceQuotaManager.addNamespace(desc); 383 } 384 } 385 386 public void removeNamespaceQuota(String namespace) throws IOException { 387 if (initialized) { 388 this.namespaceQuotaManager.deleteNamespace(namespace); 389 } 390 } 391 392 public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request) 393 throws IOException { 394 boolean rpcThrottle = request.getRpcThrottleEnabled(); 395 if (initialized) { 396 masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(rpcThrottle); 397 boolean oldRpcThrottle = rpcThrottleStorage.isRpcThrottleEnabled(); 398 if (rpcThrottle != oldRpcThrottle) { 399 LOG.info("{} switch rpc throttle from {} to {}", masterServices.getClientIdAuditPrefix(), 400 oldRpcThrottle, rpcThrottle); 401 ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); 402 SwitchRpcThrottleProcedure procedure = new SwitchRpcThrottleProcedure(rpcThrottleStorage, 403 rpcThrottle, masterServices.getServerName(), latch); 404 masterServices.getMasterProcedureExecutor().submitProcedure(procedure); 405 latch.await(); 406 } else { 407 LOG.warn("Skip switch rpc throttle to {} because it's the same with old value", 408 rpcThrottle); 409 } 410 SwitchRpcThrottleResponse response = SwitchRpcThrottleResponse.newBuilder() 411 .setPreviousRpcThrottleEnabled(oldRpcThrottle).build(); 412 masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldRpcThrottle, rpcThrottle); 413 return response; 414 } else { 415 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", rpcThrottle); 416 return SwitchRpcThrottleResponse.newBuilder().setPreviousRpcThrottleEnabled(false).build(); 417 } 418 } 419 420 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(IsRpcThrottleEnabledRequest request) 421 throws IOException { 422 if (initialized) { 423 masterServices.getMasterCoprocessorHost().preIsRpcThrottleEnabled(); 424 boolean enabled = isRpcThrottleEnabled(); 425 IsRpcThrottleEnabledResponse response = 426 IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(enabled).build(); 427 masterServices.getMasterCoprocessorHost().postIsRpcThrottleEnabled(enabled); 428 return response; 429 } else { 430 LOG.warn("Skip get rpc throttle because rpc quota is disabled"); 431 return IsRpcThrottleEnabledResponse.newBuilder().setRpcThrottleEnabled(false).build(); 432 } 433 } 434 435 public boolean isRpcThrottleEnabled() throws IOException { 436 return initialized ? rpcThrottleStorage.isRpcThrottleEnabled() : false; 437 } 438 439 public SwitchExceedThrottleQuotaResponse 440 switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException { 441 boolean enabled = request.getExceedThrottleQuotaEnabled(); 442 if (initialized) { 443 masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled); 444 boolean previousEnabled = 445 QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()); 446 if (previousEnabled == enabled) { 447 LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value", 448 enabled); 449 } else { 450 QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled); 451 LOG.info("{} switch exceed throttle quota from {} to {}", 452 masterServices.getClientIdAuditPrefix(), previousEnabled, enabled); 453 } 454 SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder() 455 .setPreviousExceedThrottleQuotaEnabled(previousEnabled).build(); 456 masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled, 457 enabled); 458 return response; 459 } else { 460 LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled); 461 return SwitchExceedThrottleQuotaResponse.newBuilder() 462 .setPreviousExceedThrottleQuotaEnabled(false).build(); 463 } 464 } 465 466 public boolean isExceedThrottleQuotaEnabled() throws IOException { 467 return initialized 468 ? QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection()) 469 : false; 470 } 471 472 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) 473 throws IOException, InterruptedException { 474 if (req.hasRemoveAll() && req.getRemoveAll() == true) { 475 quotaOps.preApply(null); 476 quotaOps.delete(); 477 quotaOps.postApply(null); 478 return; 479 } 480 481 // Apply quota changes 482 GlobalQuotaSettingsImpl currentQuota = quotaOps.fetch(); 483 if (LOG.isTraceEnabled()) { 484 LOG.trace( 485 "Current quota for request(" + TextFormat.shortDebugString(req) + "): " + currentQuota); 486 } 487 // Call the appropriate "pre" CP hook with the current quota value (may be null) 488 quotaOps.preApply(currentQuota); 489 // Translate the protobuf request back into a POJO 490 QuotaSettings newQuota = QuotaSettings.buildFromProto(req); 491 if (LOG.isTraceEnabled()) { 492 LOG.trace("Deserialized quota from request: " + newQuota); 493 } 494 495 // Merge the current quota settings with the new quota settings the user provided. 496 // 497 // NB: while SetQuotaRequest technically allows for multi types of quotas to be set in one 498 // message, the Java API (in Admin/AsyncAdmin) does not. Assume there is only one type. 499 GlobalQuotaSettingsImpl mergedQuota = currentQuota.merge(newQuota); 500 if (LOG.isTraceEnabled()) { 501 LOG.trace("Computed merged quota from current quota and user request: " + mergedQuota); 502 } 503 504 // Submit new changes 505 if (mergedQuota == null) { 506 quotaOps.delete(); 507 } else { 508 quotaOps.update(mergedQuota); 509 } 510 // Advertise the final result via the "post" CP hook 511 quotaOps.postApply(mergedQuota); 512 } 513 514 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException { 515 if (initialized) { 516 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions); 517 } 518 } 519 520 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException { 521 if (initialized) { 522 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions); 523 } 524 } 525 526 /** Returns cached region count, or -1 if quota manager is disabled or table status not found */ 527 public int getRegionCountOfTable(TableName tName) throws IOException { 528 if (initialized) { 529 return namespaceQuotaManager.getRegionCountOfTable(tName); 530 } 531 return -1; 532 } 533 534 @Override 535 public void onRegionMerged(RegionInfo mergedRegion) throws IOException { 536 if (initialized) { 537 namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion); 538 } 539 } 540 541 @Override 542 public void onRegionSplit(RegionInfo hri) throws IOException { 543 if (initialized) { 544 namespaceQuotaManager.checkQuotaToSplitRegion(hri); 545 } 546 } 547 548 /** 549 * Remove table from namespace quota. 550 * @param tName - The table name to update quota usage. 551 * @throws IOException Signals that an I/O exception has occurred. 552 */ 553 public void removeTableFromNamespaceQuota(TableName tName) throws IOException { 554 if (initialized) { 555 namespaceQuotaManager.removeFromNamespaceUsage(tName); 556 } 557 } 558 559 public NamespaceAuditor getNamespaceQuotaManager() { 560 return this.namespaceQuotaManager; 561 } 562 563 /** 564 * Encapsulates CRUD quota operations for some subject. 565 */ 566 private static interface SetQuotaOperations { 567 /** 568 * Fetches the current quota settings for the subject. 569 */ 570 GlobalQuotaSettingsImpl fetch() throws IOException; 571 572 /** 573 * Deletes the quota for the subject. 574 */ 575 void delete() throws IOException; 576 577 /** 578 * Persist the given quota for the subject. 579 */ 580 void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 581 582 /** 583 * Performs some action before {@link #update(GlobalQuotaSettingsImpl)} with the current quota 584 * for the subject. 585 */ 586 void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 587 588 /** 589 * Performs some action after {@link #update(GlobalQuotaSettingsImpl)} with the resulting quota 590 * from the request action for the subject. 591 */ 592 void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException; 593 } 594 595 /* 596 * ========================================================================== Helpers 597 */ 598 599 private void checkQuotaSupport() throws IOException { 600 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) { 601 throw new DoNotRetryIOException(new UnsupportedOperationException("quota support disabled")); 602 } 603 if (!initialized) { 604 long maxWaitTime = masterServices.getConfiguration() 605 .getLong("hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds. 606 long startTime = EnvironmentEdgeManager.currentTime(); 607 do { 608 try { 609 Thread.sleep(100); 610 } catch (InterruptedException e) { 611 LOG.warn("Interrupted while waiting for Quota Manager to be initialized."); 612 break; 613 } 614 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime); 615 if (!initialized) { 616 throw new IOException("Quota manager is uninitialized, please retry later."); 617 } 618 } 619 } 620 621 private void createQuotaTable() throws IOException { 622 masterServices.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC); 623 } 624 625 private static class NamedLock<T> { 626 private final HashSet<T> locks = new HashSet<>(); 627 628 public void lock(final T name) throws InterruptedException { 629 synchronized (locks) { 630 while (locks.contains(name)) { 631 locks.wait(); 632 } 633 locks.add(name); 634 } 635 } 636 637 public void unlock(final T name) { 638 synchronized (locks) { 639 locks.remove(name); 640 locks.notifyAll(); 641 } 642 } 643 } 644 645 @Override 646 public void onRegionSplitReverted(RegionInfo hri) throws IOException { 647 if (initialized) { 648 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri); 649 } 650 } 651 652 /** 653 * Holds the size of a region at the given time, millis since the epoch. 654 */ 655 private static class SizeSnapshotWithTimestamp { 656 private final long size; 657 private final long time; 658 659 public SizeSnapshotWithTimestamp(long size, long time) { 660 this.size = size; 661 this.time = time; 662 } 663 664 public long getSize() { 665 return size; 666 } 667 668 public long getTime() { 669 return time; 670 } 671 672 @Override 673 public boolean equals(Object o) { 674 if (o instanceof SizeSnapshotWithTimestamp) { 675 SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o; 676 return size == other.size && time == other.time; 677 } 678 return false; 679 } 680 681 @Override 682 public int hashCode() { 683 HashCodeBuilder hcb = new HashCodeBuilder(); 684 return hcb.append(size).append(time).toHashCode(); 685 } 686 687 @Override 688 public String toString() { 689 StringBuilder sb = new StringBuilder(32); 690 sb.append("SizeSnapshotWithTimestamp={size=").append(size).append("B, "); 691 sb.append("time=").append(time).append("}"); 692 return sb.toString(); 693 } 694 } 695 696 void initializeRegionSizes() { 697 assert regionSizes == null; 698 this.regionSizes = new ConcurrentHashMap<>(); 699 } 700 701 public void addRegionSize(RegionInfo hri, long size, long time) { 702 if (regionSizes == null) { 703 return; 704 } 705 regionSizes.put(hri, new SizeSnapshotWithTimestamp(size, time)); 706 } 707 708 public Map<RegionInfo, Long> snapshotRegionSizes() { 709 if (regionSizes == null) { 710 return EMPTY_MAP; 711 } 712 713 Map<RegionInfo, Long> copy = new HashMap<>(); 714 for (Entry<RegionInfo, SizeSnapshotWithTimestamp> entry : regionSizes.entrySet()) { 715 copy.put(entry.getKey(), entry.getValue().getSize()); 716 } 717 return copy; 718 } 719 720 int pruneEntriesOlderThan(long timeToPruneBefore, QuotaObserverChore quotaObserverChore) { 721 if (regionSizes == null) { 722 return 0; 723 } 724 int numEntriesRemoved = 0; 725 Iterator<Entry<RegionInfo, SizeSnapshotWithTimestamp>> iterator = 726 regionSizes.entrySet().iterator(); 727 while (iterator.hasNext()) { 728 RegionInfo regionInfo = iterator.next().getKey(); 729 long currentEntryTime = regionSizes.get(regionInfo).getTime(); 730 // do not prune the entries if table is in violation and 731 // violation policy is disable to avoid cycle of enable/disable. 732 // Please refer HBASE-22012 for more details. 733 // prune entries older than time. 734 if ( 735 currentEntryTime < timeToPruneBefore 736 && !isInViolationAndPolicyDisable(regionInfo.getTable(), quotaObserverChore) 737 ) { 738 iterator.remove(); 739 numEntriesRemoved++; 740 } 741 } 742 return numEntriesRemoved; 743 } 744 745 /** 746 * Method to check if a table is in violation and policy set on table is DISABLE. 747 * @param tableName tableName to check. 748 * @param quotaObserverChore QuotaObserverChore instance 749 * @return returns true if table is in violation and policy is disable else false. 750 */ 751 private boolean isInViolationAndPolicyDisable(TableName tableName, 752 QuotaObserverChore quotaObserverChore) { 753 boolean isInViolationAtTable = false; 754 boolean isInViolationAtNamespace = false; 755 SpaceViolationPolicy tablePolicy = null; 756 SpaceViolationPolicy namespacePolicy = null; 757 // Get Current Snapshot for the given table 758 SpaceQuotaSnapshot tableQuotaSnapshot = quotaObserverChore.getTableQuotaSnapshot(tableName); 759 SpaceQuotaSnapshot namespaceQuotaSnapshot = 760 quotaObserverChore.getNamespaceQuotaSnapshot(tableName.getNamespaceAsString()); 761 if (tableQuotaSnapshot != null) { 762 // check if table in violation 763 isInViolationAtTable = tableQuotaSnapshot.getQuotaStatus().isInViolation(); 764 Optional<SpaceViolationPolicy> policy = tableQuotaSnapshot.getQuotaStatus().getPolicy(); 765 if (policy.isPresent()) { 766 tablePolicy = policy.get(); 767 } 768 } 769 if (namespaceQuotaSnapshot != null) { 770 // check namespace in violation 771 isInViolationAtNamespace = namespaceQuotaSnapshot.getQuotaStatus().isInViolation(); 772 Optional<SpaceViolationPolicy> policy = namespaceQuotaSnapshot.getQuotaStatus().getPolicy(); 773 if (policy.isPresent()) { 774 namespacePolicy = policy.get(); 775 } 776 } 777 return (tablePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtTable) 778 || (namespacePolicy == SpaceViolationPolicy.DISABLE && isInViolationAtNamespace); 779 } 780 781 /** 782 * Removes each region size entry where the RegionInfo references the provided TableName. 783 * @param tableName tableName. 784 */ 785 public void removeRegionSizesForTable(TableName tableName) { 786 regionSizes.keySet().removeIf(regionInfo -> regionInfo.getTable().equals(tableName)); 787 } 788 789 public void processFileArchivals(FileArchiveNotificationRequest request, Connection conn, 790 Configuration conf, FileSystem fs) throws IOException { 791 final HashMultimap<TableName, Entry<String, Long>> archivedFilesByTable = HashMultimap.create(); 792 // Group the archived files by table 793 for (FileWithSize fileWithSize : request.getArchivedFilesList()) { 794 TableName tn = ProtobufUtil.toTableName(fileWithSize.getTableName()); 795 archivedFilesByTable.put(tn, 796 Maps.immutableEntry(fileWithSize.getName(), fileWithSize.getSize())); 797 } 798 if (LOG.isTraceEnabled()) { 799 LOG.trace("Grouped archived files by table: " + archivedFilesByTable); 800 } 801 // Report each set of files to the appropriate object 802 for (TableName tn : archivedFilesByTable.keySet()) { 803 final Set<Entry<String, Long>> filesWithSize = archivedFilesByTable.get(tn); 804 final FileArchiverNotifier notifier = 805 FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn); 806 notifier.addArchivedFiles(filesWithSize); 807 } 808 } 809}