001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Set; 029import java.util.concurrent.TimeUnit; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.hbase.ScheduledChore; 034import org.apache.hadoop.hbase.Stoppable; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.Delete; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.master.HMaster; 041import org.apache.hadoop.hbase.master.MetricsMaster; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 047import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 048 049/** 050 * A Master-invoked {@code Chore} that computes the size of each snapshot which was created from a 051 * table which has a space quota. 052 */ 053@InterfaceAudience.Private 054public class SnapshotQuotaObserverChore extends ScheduledChore { 055 private static final Logger LOG = LoggerFactory.getLogger(SnapshotQuotaObserverChore.class); 056 static final String SNAPSHOT_QUOTA_CHORE_PERIOD_KEY = "hbase.master.quotas.snapshot.chore.period"; 057 static final int SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis 058 059 static final String SNAPSHOT_QUOTA_CHORE_DELAY_KEY = "hbase.master.quotas.snapshot.chore.delay"; 060 static final long SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute in millis 061 062 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY = 063 "hbase.master.quotas.snapshot.chore.timeunit"; 064 static final String SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name(); 065 066 private final Connection conn; 067 private final Configuration conf; 068 private final MetricsMaster metrics; 069 private final FileSystem fs; 070 071 public SnapshotQuotaObserverChore(HMaster master, MetricsMaster metrics) { 072 this(master.getConnection(), master.getConfiguration(), master.getFileSystem(), master, 073 metrics); 074 } 075 076 SnapshotQuotaObserverChore(Connection conn, Configuration conf, FileSystem fs, Stoppable stopper, 077 MetricsMaster metrics) { 078 super(QuotaObserverChore.class.getSimpleName(), stopper, getPeriod(conf), getInitialDelay(conf), 079 getTimeUnit(conf)); 080 this.conn = conn; 081 this.conf = conf; 082 this.metrics = metrics; 083 this.fs = fs; 084 } 085 086 @Override 087 protected void chore() { 088 try { 089 if (LOG.isTraceEnabled()) { 090 LOG.trace("Computing sizes of snapshots for quota management."); 091 } 092 long start = System.nanoTime(); 093 _chore(); 094 if (null != metrics) { 095 metrics.incrementSnapshotObserverTime((System.nanoTime() - start) / 1_000_000); 096 } 097 } catch (IOException e) { 098 LOG.warn("Failed to compute the size of snapshots, will retry", e); 099 } 100 } 101 102 void _chore() throws IOException { 103 // Gets all tables with quotas that also have snapshots. 104 // This values are all of the snapshots that we need to compute the size of. 105 long start = System.nanoTime(); 106 Multimap<TableName, String> snapshotsToComputeSize = getSnapshotsToComputeSize(); 107 if (null != metrics) { 108 metrics.incrementSnapshotFetchTime((System.nanoTime() - start) / 1_000_000); 109 } 110 111 // Remove old table snapshots data 112 pruneTableSnapshots(snapshotsToComputeSize); 113 114 // Remove old namespace snapshots data 115 pruneNamespaceSnapshots(snapshotsToComputeSize); 116 117 // For each table, compute the size of each snapshot 118 Map<String, Long> namespaceSnapshotSizes = computeSnapshotSizes(snapshotsToComputeSize); 119 120 // Write the size data by namespaces to the quota table. 121 // We need to do this "globally" since each FileArchiverNotifier is limited to its own Table. 122 persistSnapshotSizesForNamespaces(namespaceSnapshotSizes); 123 } 124 125 /** 126 * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize 127 * @param snapshotsToComputeSize list of snapshots to be persisted 128 */ 129 void pruneTableSnapshots(Multimap<TableName, String> snapshotsToComputeSize) throws IOException { 130 Multimap<TableName, String> existingSnapshotEntries = QuotaTableUtil.getTableSnapshots(conn); 131 Multimap<TableName, String> snapshotEntriesToRemove = HashMultimap.create(); 132 for (Entry<TableName, Collection<String>> entry : existingSnapshotEntries.asMap().entrySet()) { 133 TableName tn = entry.getKey(); 134 Set<String> setOfSnapshots = new HashSet<>(entry.getValue()); 135 for (String snapshot : snapshotsToComputeSize.get(tn)) { 136 setOfSnapshots.remove(snapshot); 137 } 138 139 for (String snapshot : setOfSnapshots) { 140 snapshotEntriesToRemove.put(tn, snapshot); 141 } 142 } 143 removeExistingTableSnapshotSizes(snapshotEntriesToRemove); 144 } 145 146 /** 147 * Removes the snapshot entries that are present in Quota table but not in snapshotsToComputeSize 148 * @param snapshotsToComputeSize list of snapshots to be persisted 149 */ 150 void pruneNamespaceSnapshots(Multimap<TableName, String> snapshotsToComputeSize) 151 throws IOException { 152 Set<String> existingSnapshotEntries = QuotaTableUtil.getNamespaceSnapshots(conn); 153 for (TableName tableName : snapshotsToComputeSize.keySet()) { 154 existingSnapshotEntries.remove(tableName.getNamespaceAsString()); 155 } 156 // here existingSnapshotEntries is left with the entries to be removed 157 removeExistingNamespaceSnapshotSizes(existingSnapshotEntries); 158 } 159 160 /** 161 * Fetches each table with a quota (table or namespace quota), and then fetch the name of each 162 * snapshot which was created from that table. 163 * @return A mapping of table to snapshots created from that table 164 */ 165 Multimap<TableName, String> getSnapshotsToComputeSize() throws IOException { 166 Set<TableName> tablesToFetchSnapshotsFrom = new HashSet<>(); 167 QuotaFilter filter = new QuotaFilter(); 168 filter.addTypeFilter(QuotaType.SPACE); 169 try (Admin admin = conn.getAdmin()) { 170 // Pull all of the tables that have quotas (direct, or from namespace) 171 for (QuotaSettings qs : QuotaRetriever.open(conf, filter)) { 172 if (qs.getQuotaType() == QuotaType.SPACE) { 173 String ns = qs.getNamespace(); 174 TableName tn = qs.getTableName(); 175 if ((null == ns && null == tn) || (null != ns && null != tn)) { 176 throw new IllegalStateException( 177 "Expected either one of namespace and tablename to be null but not both"); 178 } 179 // Collect either the table name itself, or all of the tables in the namespace 180 if (null != ns) { 181 tablesToFetchSnapshotsFrom.addAll(Arrays.asList(admin.listTableNamesByNamespace(ns))); 182 } else { 183 tablesToFetchSnapshotsFrom.add(tn); 184 } 185 } 186 } 187 // Fetch all snapshots that were created from these tables 188 return getSnapshotsFromTables(admin, tablesToFetchSnapshotsFrom); 189 } 190 } 191 192 /** 193 * Computes a mapping of originating {@code TableName} to snapshots, when the {@code TableName} 194 * exists in the provided {@code Set}. 195 */ 196 Multimap<TableName, String> getSnapshotsFromTables(Admin admin, 197 Set<TableName> tablesToFetchSnapshotsFrom) throws IOException { 198 Multimap<TableName, String> snapshotsToCompute = HashMultimap.create(); 199 for (org.apache.hadoop.hbase.client.SnapshotDescription sd : admin.listSnapshots()) { 200 TableName tn = sd.getTableName(); 201 if (tablesToFetchSnapshotsFrom.contains(tn)) { 202 snapshotsToCompute.put(tn, sd.getName()); 203 } 204 } 205 return snapshotsToCompute; 206 } 207 208 /** 209 * Computes the size of each snapshot provided given the current files referenced by the table. 210 * @param snapshotsToComputeSize The snapshots to compute the size of 211 * @return A mapping of table to snapshot created from that table and the snapshot's size. 212 */ 213 Map<String, Long> computeSnapshotSizes(Multimap<TableName, String> snapshotsToComputeSize) 214 throws IOException { 215 final Map<String, Long> snapshotSizesByNamespace = new HashMap<>(); 216 final long start = System.nanoTime(); 217 for (Entry<TableName, Collection<String>> entry : snapshotsToComputeSize.asMap().entrySet()) { 218 final TableName tn = entry.getKey(); 219 final Collection<String> snapshotNames = entry.getValue(); 220 221 // Get our notifier instance, this is tracking archivals that happen out-of-band of this chore 222 FileArchiverNotifier notifier = getNotifierForTable(tn); 223 224 // The total size consumed by all snapshots against this table 225 long totalSnapshotSize = notifier.computeAndStoreSnapshotSizes(snapshotNames); 226 // Bucket that size into the appropriate namespace 227 snapshotSizesByNamespace.merge(tn.getNamespaceAsString(), totalSnapshotSize, Long::sum); 228 } 229 230 // Update the amount of time it took to compute the size of the snapshots for a table 231 if (metrics != null) { 232 metrics.incrementSnapshotSizeComputationTime((System.nanoTime() - start) / 1_000_000); 233 } 234 235 return snapshotSizesByNamespace; 236 } 237 238 /** 239 * Returns the correct instance of {@link FileArchiverNotifier} for the given table name. 240 * @param tn The table name 241 * @return A {@link FileArchiverNotifier} instance 242 */ 243 FileArchiverNotifier getNotifierForTable(TableName tn) { 244 return FileArchiverNotifierFactoryImpl.getInstance().get(conn, conf, fs, tn); 245 } 246 247 /** 248 * Writes the size used by snapshots for each namespace to the quota table. 249 */ 250 void persistSnapshotSizesForNamespaces(Map<String, Long> snapshotSizesByNamespace) 251 throws IOException { 252 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 253 quotaTable.put(snapshotSizesByNamespace.entrySet().stream() 254 .map(e -> QuotaTableUtil.createPutForNamespaceSnapshotSize(e.getKey(), e.getValue())) 255 .collect(Collectors.toList())); 256 } 257 } 258 259 void removeExistingTableSnapshotSizes(Multimap<TableName, String> snapshotEntriesToRemove) 260 throws IOException { 261 removeExistingSnapshotSizes( 262 QuotaTableUtil.createDeletesForExistingTableSnapshotSizes(snapshotEntriesToRemove)); 263 } 264 265 void removeExistingNamespaceSnapshotSizes(Set<String> snapshotEntriesToRemove) 266 throws IOException { 267 removeExistingSnapshotSizes( 268 QuotaTableUtil.createDeletesForExistingNamespaceSnapshotSizes(snapshotEntriesToRemove)); 269 } 270 271 void removeExistingSnapshotSizes(List<Delete> deletes) throws IOException { 272 try (Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME)) { 273 quotaTable.delete(deletes); 274 } 275 } 276 277 /** 278 * Extracts the period for the chore from the configuration. 279 * @param conf The configuration object. 280 * @return The configured chore period or the default value. 281 */ 282 static int getPeriod(Configuration conf) { 283 return conf.getInt(SNAPSHOT_QUOTA_CHORE_PERIOD_KEY, SNAPSHOT_QUOTA_CHORE_PERIOD_DEFAULT); 284 } 285 286 /** 287 * Extracts the initial delay for the chore from the configuration. 288 * @param conf The configuration object. 289 * @return The configured chore initial delay or the default value. 290 */ 291 static long getInitialDelay(Configuration conf) { 292 return conf.getLong(SNAPSHOT_QUOTA_CHORE_DELAY_KEY, SNAPSHOT_QUOTA_CHORE_DELAY_DEFAULT); 293 } 294 295 /** 296 * Extracts the time unit for the chore period and initial delay from the configuration. The 297 * configuration value for {@link #SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY} must correspond to a 298 * {@link TimeUnit} value. 299 * @param conf The configuration object. 300 * @return The configured time unit for the chore period and initial delay or the default value. 301 */ 302 static TimeUnit getTimeUnit(Configuration conf) { 303 return TimeUnit 304 .valueOf(conf.get(SNAPSHOT_QUOTA_CHORE_TIMEUNIT_KEY, SNAPSHOT_QUOTA_CHORE_TIMEUNIT_DEFAULT)); 305 } 306}