001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.HashSet; 029import java.util.Map; 030import java.util.Map.Entry; 031import java.util.Set; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.hbase.Cell; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.NamespaceDescriptor; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.Waiter.Predicate; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.SnapshotDescription; 048import org.apache.hadoop.hbase.client.SnapshotType; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.master.HMaster; 051import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.NoFilesToDischarge; 052import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate; 053import org.apache.hadoop.hbase.regionserver.HStore; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.junit.AfterClass; 056import org.junit.Before; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Rule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.rules.TestName; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 067import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 068import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 069 070/** 071 * Test class for the {@link SnapshotQuotaObserverChore}. 072 */ 073@Category(LargeTests.class) 074public class TestSnapshotQuotaObserverChore { 075 076 @ClassRule 077 public static final HBaseClassTestRule CLASS_RULE = 078 HBaseClassTestRule.forClass(TestSnapshotQuotaObserverChore.class); 079 080 private static final Logger LOG = LoggerFactory.getLogger(TestSnapshotQuotaObserverChore.class); 081 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 082 private static final AtomicLong COUNTER = new AtomicLong(); 083 084 @Rule 085 public TestName testName = new TestName(); 086 087 private Connection conn; 088 private Admin admin; 089 private SpaceQuotaHelperForTests helper; 090 private HMaster master; 091 private SnapshotQuotaObserverChore testChore; 092 093 @BeforeClass 094 public static void setUp() throws Exception { 095 Configuration conf = TEST_UTIL.getConfiguration(); 096 SpaceQuotaHelperForTests.updateConfigForQuotas(conf); 097 // Clean up the compacted files faster than normal (15s instead of 2mins) 098 conf.setInt("hbase.hfile.compaction.discharger.interval", 15 * 1000); 099 TEST_UTIL.startMiniCluster(1); 100 } 101 102 @AfterClass 103 public static void tearDown() throws Exception { 104 TEST_UTIL.shutdownMiniCluster(); 105 } 106 107 @Before 108 public void setup() throws Exception { 109 conn = TEST_UTIL.getConnection(); 110 admin = TEST_UTIL.getAdmin(); 111 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER); 112 master = TEST_UTIL.getHBaseCluster().getMaster(); 113 helper.removeAllQuotas(conn); 114 testChore = new SnapshotQuotaObserverChore(TEST_UTIL.getConnection(), 115 TEST_UTIL.getConfiguration(), master.getFileSystem(), master, null); 116 } 117 118 @Test 119 public void testSnapshotsFromTables() throws Exception { 120 TableName tn1 = helper.createTableWithRegions(1); 121 TableName tn2 = helper.createTableWithRegions(1); 122 TableName tn3 = helper.createTableWithRegions(1); 123 124 // Set a space quota on table 1 and 2 (but not 3) 125 admin.setQuota(QuotaSettingsFactory.limitTableSpace(tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, 126 SpaceViolationPolicy.NO_INSERTS)); 127 admin.setQuota(QuotaSettingsFactory.limitTableSpace(tn2, SpaceQuotaHelperForTests.ONE_GIGABYTE, 128 SpaceViolationPolicy.NO_INSERTS)); 129 130 // Create snapshots on each table (we didn't write any data, so just skipflush) 131 admin.snapshot(new SnapshotDescription(tn1 + "snapshot", tn1, SnapshotType.SKIPFLUSH)); 132 admin.snapshot(new SnapshotDescription(tn2 + "snapshot", tn2, SnapshotType.SKIPFLUSH)); 133 admin.snapshot(new SnapshotDescription(tn3 + "snapshot", tn3, SnapshotType.SKIPFLUSH)); 134 135 Multimap<TableName, String> mapping = testChore.getSnapshotsToComputeSize(); 136 assertEquals(2, mapping.size()); 137 assertEquals(1, mapping.get(tn1).size()); 138 assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next()); 139 assertEquals(1, mapping.get(tn2).size()); 140 assertEquals(tn2 + "snapshot", mapping.get(tn2).iterator().next()); 141 142 admin.snapshot(new SnapshotDescription(tn2 + "snapshot1", tn2, SnapshotType.SKIPFLUSH)); 143 admin.snapshot(new SnapshotDescription(tn3 + "snapshot1", tn3, SnapshotType.SKIPFLUSH)); 144 145 mapping = testChore.getSnapshotsToComputeSize(); 146 assertEquals(3, mapping.size()); 147 assertEquals(1, mapping.get(tn1).size()); 148 assertEquals(tn1 + "snapshot", mapping.get(tn1).iterator().next()); 149 assertEquals(2, mapping.get(tn2).size()); 150 assertEquals(new HashSet<String>(Arrays.asList(tn2 + "snapshot", tn2 + "snapshot1")), 151 mapping.get(tn2)); 152 } 153 154 @Test 155 public void testSnapshotsFromNamespaces() throws Exception { 156 NamespaceDescriptor ns = NamespaceDescriptor.create("snapshots_from_namespaces").build(); 157 admin.createNamespace(ns); 158 159 TableName tn1 = helper.createTableWithRegions(ns.getName(), 1); 160 TableName tn2 = helper.createTableWithRegions(ns.getName(), 1); 161 TableName tn3 = helper.createTableWithRegions(1); 162 163 // Set a throttle quota on 'default' namespace 164 admin.setQuota(QuotaSettingsFactory.throttleNamespace(tn3.getNamespaceAsString(), 165 ThrottleType.WRITE_NUMBER, 100, TimeUnit.SECONDS)); 166 // Set a user throttle quota 167 admin.setQuota( 168 QuotaSettingsFactory.throttleUser("user", ThrottleType.WRITE_NUMBER, 100, TimeUnit.MINUTES)); 169 170 // Set a space quota on the namespace 171 admin.setQuota(QuotaSettingsFactory.limitNamespaceSpace(ns.getName(), 172 SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS)); 173 174 // Create snapshots on each table (we didn't write any data, so just skipflush) 175 admin.snapshot(new SnapshotDescription(tn1.getQualifierAsString() + "snapshot", tn1, 176 SnapshotType.SKIPFLUSH)); 177 admin.snapshot(new SnapshotDescription(tn2.getQualifierAsString() + "snapshot", tn2, 178 SnapshotType.SKIPFLUSH)); 179 admin.snapshot(new SnapshotDescription(tn3.getQualifierAsString() + "snapshot", tn3, 180 SnapshotType.SKIPFLUSH)); 181 182 Multimap<TableName, String> mapping = testChore.getSnapshotsToComputeSize(); 183 assertEquals(2, mapping.size()); 184 assertEquals(1, mapping.get(tn1).size()); 185 assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next()); 186 assertEquals(1, mapping.get(tn2).size()); 187 assertEquals(tn2.getQualifierAsString() + "snapshot", mapping.get(tn2).iterator().next()); 188 189 admin.snapshot(new SnapshotDescription(tn2.getQualifierAsString() + "snapshot1", tn2, 190 SnapshotType.SKIPFLUSH)); 191 admin.snapshot(new SnapshotDescription(tn3.getQualifierAsString() + "snapshot2", tn3, 192 SnapshotType.SKIPFLUSH)); 193 194 mapping = testChore.getSnapshotsToComputeSize(); 195 assertEquals(3, mapping.size()); 196 assertEquals(1, mapping.get(tn1).size()); 197 assertEquals(tn1.getQualifierAsString() + "snapshot", mapping.get(tn1).iterator().next()); 198 assertEquals(2, mapping.get(tn2).size()); 199 assertEquals(new HashSet<String>(Arrays.asList(tn2.getQualifierAsString() + "snapshot", 200 tn2.getQualifierAsString() + "snapshot1")), mapping.get(tn2)); 201 } 202 203 @Test 204 public void testSnapshotSize() throws Exception { 205 // Create a table and set a quota 206 TableName tn1 = helper.createTableWithRegions(5); 207 admin.setQuota(QuotaSettingsFactory.limitTableSpace(tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, 208 SpaceViolationPolicy.NO_INSERTS)); 209 210 // Write some data and flush it 211 helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); 212 admin.flush(tn1); 213 214 final long snapshotSize = TEST_UTIL.getMiniHBaseCluster().getRegions(tn1).stream() 215 .flatMap(r -> r.getStores().stream()).mapToLong(HStore::getHFilesSize).sum(); 216 217 // Wait for the Master chore to run to see the usage (with a fudge factor) 218 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 219 @Override 220 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 221 return snapshot.getUsage() == snapshotSize; 222 } 223 }); 224 225 // Create a snapshot on the table 226 final String snapshotName = tn1 + "snapshot"; 227 admin.snapshot(new SnapshotDescription(snapshotName, tn1, SnapshotType.SKIPFLUSH)); 228 229 // Get the snapshots 230 Multimap<TableName, String> snapshotsToCompute = testChore.getSnapshotsToComputeSize(); 231 assertEquals("Expected to see the single snapshot: " + snapshotsToCompute, 1, 232 snapshotsToCompute.size()); 233 234 // Get the size of our snapshot 235 Map<String, Long> namespaceSnapshotSizes = testChore.computeSnapshotSizes(snapshotsToCompute); 236 assertEquals(1, namespaceSnapshotSizes.size()); 237 Long size = namespaceSnapshotSizes.get(tn1.getNamespaceAsString()); 238 assertNotNull(size); 239 // The snapshot should take up no space since the table refers to it completely 240 assertEquals(0, size.longValue()); 241 242 // Write some more data, flush it, and then major_compact the table 243 helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); 244 admin.flush(tn1); 245 TEST_UTIL.compact(tn1, true); 246 247 // Test table should reflect it's original size since ingest was deterministic 248 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 249 private final long regionSize = TEST_UTIL.getMiniHBaseCluster().getRegions(tn1).stream() 250 .flatMap(r -> r.getStores().stream()).mapToLong(HStore::getHFilesSize).sum(); 251 252 @Override 253 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 254 LOG.debug("Current usage=" + snapshot.getUsage() + " snapshotSize=" + snapshotSize); 255 // The usage of table space consists of region size and snapshot size 256 return closeInSize(snapshot.getUsage(), snapshotSize + regionSize, 257 SpaceQuotaHelperForTests.ONE_KILOBYTE); 258 } 259 }); 260 261 // Wait for no compacted files on the regions of our table 262 TEST_UTIL.waitFor(30_000, new NoFilesToDischarge(TEST_UTIL.getMiniHBaseCluster(), tn1)); 263 264 // Still should see only one snapshot 265 snapshotsToCompute = testChore.getSnapshotsToComputeSize(); 266 assertEquals("Expected to see the single snapshot: " + snapshotsToCompute, 1, 267 snapshotsToCompute.size()); 268 namespaceSnapshotSizes = testChore.computeSnapshotSizes(snapshotsToCompute); 269 assertEquals(1, namespaceSnapshotSizes.size()); 270 size = namespaceSnapshotSizes.get(tn1.getNamespaceAsString()); 271 assertNotNull(size); 272 // The snapshot should take up the size the table originally took up 273 assertEquals(snapshotSize, size.longValue()); 274 } 275 276 @Test 277 public void testPersistingSnapshotsForNamespaces() throws Exception { 278 TableName tn1 = TableName.valueOf("ns1:tn1"); 279 TableName tn2 = TableName.valueOf("ns1:tn2"); 280 TableName tn3 = TableName.valueOf("ns2:tn1"); 281 TableName tn4 = TableName.valueOf("ns2:tn2"); 282 TableName tn5 = TableName.valueOf("tn1"); 283 // Shim in a custom factory to avoid computing snapshot sizes. 284 FileArchiverNotifierFactory test = new FileArchiverNotifierFactory() { 285 Map<TableName, Long> tableToSize = 286 ImmutableMap.of(tn1, 1024L, tn2, 1024L, tn3, 512L, tn4, 1024L, tn5, 3072L); 287 288 @Override 289 public FileArchiverNotifier get(Connection conn, Configuration conf, FileSystem fs, 290 TableName tn) { 291 return new FileArchiverNotifier() { 292 @Override 293 public void addArchivedFiles(Set<Entry<String, Long>> fileSizes) throws IOException { 294 } 295 296 @Override 297 public long computeAndStoreSnapshotSizes(Collection<String> currentSnapshots) 298 throws IOException { 299 return tableToSize.get(tn); 300 } 301 }; 302 } 303 }; 304 try { 305 FileArchiverNotifierFactoryImpl.setInstance(test); 306 307 Multimap<TableName, String> snapshotsToCompute = HashMultimap.create(); 308 snapshotsToCompute.put(tn1, ""); 309 snapshotsToCompute.put(tn2, ""); 310 snapshotsToCompute.put(tn3, ""); 311 snapshotsToCompute.put(tn4, ""); 312 snapshotsToCompute.put(tn5, ""); 313 Map<String, Long> nsSizes = testChore.computeSnapshotSizes(snapshotsToCompute); 314 assertEquals(3, nsSizes.size()); 315 assertEquals(2048L, (long) nsSizes.get("ns1")); 316 assertEquals(1536L, (long) nsSizes.get("ns2")); 317 assertEquals(3072L, (long) nsSizes.get(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR)); 318 } finally { 319 FileArchiverNotifierFactoryImpl.reset(); 320 } 321 } 322 323 @Test 324 public void testRemovedSnapshots() throws Exception { 325 // Create a table and set a quota 326 TableName tn1 = helper.createTableWithRegions(1); 327 admin.setQuota(QuotaSettingsFactory.limitTableSpace(tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, 328 SpaceViolationPolicy.NO_INSERTS)); 329 330 // Write some data and flush it 331 helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); // 256 KB 332 333 final AtomicReference<Long> lastSeenSize = new AtomicReference<>(); 334 // Wait for the Master chore to run to see the usage (with a fudge factor) 335 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 336 @Override 337 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 338 lastSeenSize.set(snapshot.getUsage()); 339 return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE; 340 } 341 }); 342 343 // Create a snapshot on the table 344 final String snapshotName1 = tn1 + "snapshot1"; 345 admin.snapshot(new SnapshotDescription(snapshotName1, tn1, SnapshotType.SKIPFLUSH)); 346 347 // Snapshot size has to be 0 as the snapshot shares the data with the table 348 final Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME); 349 TEST_UTIL.waitFor(30_000, new Predicate<Exception>() { 350 @Override 351 public boolean evaluate() throws Exception { 352 Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName1); 353 Result r = quotaTable.get(g); 354 if (r == null || r.isEmpty()) { 355 return false; 356 } 357 r.advance(); 358 Cell c = r.current(); 359 return QuotaTableUtil.parseSnapshotSize(c) == 0; 360 } 361 }); 362 // Total usage has to remain same as what we saw before taking a snapshot 363 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 364 @Override 365 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 366 return snapshot.getUsage() == lastSeenSize.get(); 367 } 368 }); 369 370 // Major compact the table to force a rewrite 371 TEST_UTIL.compact(tn1, true); 372 // Now the snapshot size has to prev total size 373 TEST_UTIL.waitFor(30_000, new Predicate<Exception>() { 374 @Override 375 public boolean evaluate() throws Exception { 376 Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName1); 377 Result r = quotaTable.get(g); 378 if (r == null || r.isEmpty()) { 379 return false; 380 } 381 r.advance(); 382 Cell c = r.current(); 383 // The compaction result file has an additional compaction event tracker 384 return lastSeenSize.get() == QuotaTableUtil.parseSnapshotSize(c); 385 } 386 }); 387 // The total size now has to be equal/more than double of prev total size 388 // as double the number of store files exist now. 389 final AtomicReference<Long> sizeAfterCompaction = new AtomicReference<>(); 390 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 391 @Override 392 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 393 sizeAfterCompaction.set(snapshot.getUsage()); 394 return snapshot.getUsage() >= 2 * lastSeenSize.get(); 395 } 396 }); 397 398 // Delete the snapshot 399 admin.deleteSnapshot(snapshotName1); 400 // Total size has to come down to prev totalsize - snapshot size(which was removed) 401 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 402 @Override 403 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 404 return snapshot.getUsage() == (sizeAfterCompaction.get() - lastSeenSize.get()); 405 } 406 }); 407 } 408 409 @Test 410 public void testBucketingFilesToSnapshots() throws Exception { 411 // Create a table and set a quota 412 TableName tn1 = helper.createTableWithRegions(1); 413 admin.setQuota(QuotaSettingsFactory.limitTableSpace(tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, 414 SpaceViolationPolicy.NO_INSERTS)); 415 416 // Write some data and flush it 417 helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE); 418 admin.flush(tn1); 419 420 final AtomicReference<Long> lastSeenSize = new AtomicReference<>(); 421 // Wait for the Master chore to run to see the usage (with a fudge factor) 422 TEST_UTIL.waitFor(30_000, new SpaceQuotaSnapshotPredicate(conn, tn1) { 423 @Override 424 boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception { 425 lastSeenSize.set(snapshot.getUsage()); 426 return snapshot.getUsage() > 230L * SpaceQuotaHelperForTests.ONE_KILOBYTE; 427 } 428 }); 429 430 // Create a snapshot on the table 431 final String snapshotName1 = tn1 + "snapshot1"; 432 admin.snapshot(new SnapshotDescription(snapshotName1, tn1, SnapshotType.SKIPFLUSH)); 433 // Major compact the table to force a rewrite 434 TEST_UTIL.compact(tn1, true); 435 436 // Make sure that the snapshot owns the size 437 final Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME); 438 TEST_UTIL.waitFor(30_000, new Predicate<Exception>() { 439 @Override 440 public boolean evaluate() throws Exception { 441 Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName1); 442 Result r = quotaTable.get(g); 443 if (r == null || r.isEmpty()) { 444 return false; 445 } 446 r.advance(); 447 Cell c = r.current(); 448 // The compaction result file has an additional compaction event tracker 449 return lastSeenSize.get() <= QuotaTableUtil.parseSnapshotSize(c); 450 } 451 }); 452 453 // Create another snapshot on the table 454 final String snapshotName2 = tn1 + "snapshot2"; 455 admin.snapshot(new SnapshotDescription(snapshotName2, tn1, SnapshotType.SKIPFLUSH)); 456 // Major compact the table to force a rewrite 457 TEST_UTIL.compact(tn1, true); 458 459 // Make sure that the snapshot owns the size 460 TEST_UTIL.waitFor(30_000, new Predicate<Exception>() { 461 @Override 462 public boolean evaluate() throws Exception { 463 Get g = QuotaTableUtil.makeGetForSnapshotSize(tn1, snapshotName2); 464 Result r = quotaTable.get(g); 465 if (r == null || r.isEmpty()) { 466 return false; 467 } 468 r.advance(); 469 Cell c = r.current(); 470 // The compaction result file has an additional compaction event tracker 471 return lastSeenSize.get() <= QuotaTableUtil.parseSnapshotSize(c); 472 } 473 }); 474 475 Get g = QuotaTableUtil.createGetNamespaceSnapshotSize(tn1.getNamespaceAsString()); 476 Result r = quotaTable.get(g); 477 assertNotNull(r); 478 assertFalse(r.isEmpty()); 479 r.advance(); 480 long size = QuotaTableUtil.parseSnapshotSize(r.current()); 481 assertTrue(lastSeenSize.get() * 2 <= size); 482 } 483 484 /** 485 * Computes if {@code size2} is within {@code delta} of {@code size1}, inclusive. 486 */ 487 boolean closeInSize(long size1, long size2, long delta) { 488 long lower = size1 - delta; 489 long upper = size1 + delta; 490 return lower <= size2 && size2 <= upper; 491 } 492}