001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static com.codahale.metrics.MetricRegistry.name; 021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 022 023import com.codahale.metrics.Counter; 024import com.codahale.metrics.Histogram; 025import com.codahale.metrics.JmxReporter; 026import com.codahale.metrics.MetricRegistry; 027import com.codahale.metrics.RatioGauge; 028import com.codahale.metrics.Timer; 029import java.util.ArrayList; 030import java.util.List; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.ConcurrentMap; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.function.Supplier; 037import org.apache.commons.lang3.StringUtils; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.yetus.audience.InterfaceAudience; 044 045import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; 046import org.apache.hbase.thirdparty.com.google.protobuf.Message; 047 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 051 052/** 053 * This class is for maintaining the various connection statistics and publishing them through the 054 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so 055 * as to not conflict with other uses of Yammer Metrics within the client application. Calling 056 * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and 057 * "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to 058 * terminate the thread pools they allocate. The metrics reporter will be shutdown 059 * {@link #shutdown()} when all connections within this metrics instances are closed. 060 */ 061@InterfaceAudience.Private 062public final class MetricsConnection implements StatisticTrackable { 063 064 private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES = 065 new ConcurrentHashMap<>(); 066 067 static MetricsConnection getMetricsConnection(final Configuration conf, final String scope, 068 Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) { 069 return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> { 070 if (metricsConnection == null) { 071 MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool); 072 newMetricsConn.incrConnectionCount(); 073 return newMetricsConn; 074 } else { 075 metricsConnection.addThreadPools(batchPool, metaPool); 076 metricsConnection.incrConnectionCount(); 077 return metricsConnection; 078 } 079 }); 080 } 081 082 static void deleteMetricsConnection(final String scope) { 083 METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> { 084 metricsConnection.decrConnectionCount(); 085 if (metricsConnection.getConnectionCount() == 0) { 086 metricsConnection.shutdown(); 087 return null; 088 } 089 return metricsConnection; 090 }); 091 } 092 093 /** Set this key to {@code true} to enable metrics collection of client requests. */ 094 public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable"; 095 096 /** Set this key to {@code true} to enable table metrics collection of client requests. */ 097 public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY = 098 "hbase.client.table.metrics.enable"; 099 100 /** 101 * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The 102 * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's 103 * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be 104 * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users 105 * may set this key to give a more contextual name for this scope. For example, one might want to 106 * differentiate a read connection from a write connection by setting the scopes to "foo-read" and 107 * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics. 108 * Care should be taken to avoid using the same scope for multiple Connections, otherwise the 109 * metrics may aggregate in unforeseen ways. 110 */ 111 public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope"; 112 113 /** 114 * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or 115 * by generating a default from the passed clusterId and connectionObj's hashCode. 116 * @param conf configuration for the connection 117 * @param clusterId clusterId for the connection 118 * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this 119 * MetricsConnection. 120 */ 121 static String getScope(Configuration conf, String clusterId, Object connectionObj) { 122 return conf.get(METRICS_SCOPE_KEY, 123 clusterId + "@" + Integer.toHexString(connectionObj.hashCode())); 124 } 125 126 private static final String CNT_BASE = "rpcCount_"; 127 private static final String FAILURE_CNT_BASE = "rpcFailureCount_"; 128 private static final String TOTAL_EXCEPTION_CNT = "rpcTotalExceptions"; 129 private static final String LOCAL_EXCEPTION_CNT_BASE = "rpcLocalExceptions_"; 130 private static final String REMOTE_EXCEPTION_CNT_BASE = "rpcRemoteExceptions_"; 131 private static final String DRTN_BASE = "rpcCallDurationMs_"; 132 private static final String REQ_BASE = "rpcCallRequestSizeBytes_"; 133 private static final String RESP_BASE = "rpcCallResponseSizeBytes_"; 134 private static final String MEMLOAD_BASE = "memstoreLoad_"; 135 private static final String HEAP_BASE = "heapOccupancy_"; 136 private static final String CACHE_BASE = "cacheDroppingExceptions_"; 137 private static final String UNKNOWN_EXCEPTION = "UnknownException"; 138 private static final String NS_LOOKUPS = "nsLookups"; 139 private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed"; 140 private static final String CLIENT_SVC = ClientService.getDescriptor().getName(); 141 142 /** A container class for collecting details about the RPC call as it percolates. */ 143 public static class CallStats { 144 private long requestSizeBytes = 0; 145 private long responseSizeBytes = 0; 146 private long startTime = 0; 147 private long callTimeMs = 0; 148 private int concurrentCallsPerServer = 0; 149 private int numActionsPerServer = 0; 150 151 public long getRequestSizeBytes() { 152 return requestSizeBytes; 153 } 154 155 public void setRequestSizeBytes(long requestSizeBytes) { 156 this.requestSizeBytes = requestSizeBytes; 157 } 158 159 public long getResponseSizeBytes() { 160 return responseSizeBytes; 161 } 162 163 public void setResponseSizeBytes(long responseSizeBytes) { 164 this.responseSizeBytes = responseSizeBytes; 165 } 166 167 public long getStartTime() { 168 return startTime; 169 } 170 171 public void setStartTime(long startTime) { 172 this.startTime = startTime; 173 } 174 175 public long getCallTimeMs() { 176 return callTimeMs; 177 } 178 179 public void setCallTimeMs(long callTimeMs) { 180 this.callTimeMs = callTimeMs; 181 } 182 183 public int getConcurrentCallsPerServer() { 184 return concurrentCallsPerServer; 185 } 186 187 public void setConcurrentCallsPerServer(int callsPerServer) { 188 this.concurrentCallsPerServer = callsPerServer; 189 } 190 191 public int getNumActionsPerServer() { 192 return numActionsPerServer; 193 } 194 195 public void setNumActionsPerServer(int numActionsPerServer) { 196 this.numActionsPerServer = numActionsPerServer; 197 } 198 } 199 200 protected static final class CallTracker { 201 private final String name; 202 final Timer callTimer; 203 final Histogram reqHist; 204 final Histogram respHist; 205 206 private CallTracker(MetricRegistry registry, String name, String subName, String scope) { 207 StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name); 208 if (subName != null) { 209 sb.append("(").append(subName).append(")"); 210 } 211 this.name = sb.toString(); 212 this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope)); 213 this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope)); 214 this.respHist = 215 registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope)); 216 } 217 218 private CallTracker(MetricRegistry registry, String name, String scope) { 219 this(registry, name, null, scope); 220 } 221 222 public void updateRpc(CallStats stats) { 223 this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS); 224 this.reqHist.update(stats.getRequestSizeBytes()); 225 this.respHist.update(stats.getResponseSizeBytes()); 226 } 227 228 @Override 229 public String toString() { 230 return "CallTracker:" + name; 231 } 232 } 233 234 protected static class RegionStats { 235 final String name; 236 final Histogram memstoreLoadHist; 237 final Histogram heapOccupancyHist; 238 239 public RegionStats(MetricRegistry registry, String name) { 240 this.name = name; 241 this.memstoreLoadHist = 242 registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name)); 243 this.heapOccupancyHist = 244 registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name)); 245 } 246 247 public void update(RegionLoadStats regionStatistics) { 248 this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad()); 249 this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy()); 250 } 251 } 252 253 protected static class RunnerStats { 254 final Counter normalRunners; 255 final Counter delayRunners; 256 final Histogram delayIntevalHist; 257 258 public RunnerStats(MetricRegistry registry) { 259 this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount")); 260 this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount")); 261 this.delayIntevalHist = 262 registry.histogram(name(MetricsConnection.class, "delayIntervalHist")); 263 } 264 265 public void incrNormalRunners() { 266 this.normalRunners.inc(); 267 } 268 269 public void incrDelayRunners() { 270 this.delayRunners.inc(); 271 } 272 273 public void updateDelayInterval(long interval) { 274 this.delayIntevalHist.update(interval); 275 } 276 } 277 278 private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats = 279 new ConcurrentHashMap<>(); 280 281 public void updateServerStats(ServerName serverName, byte[] regionName, Object r) { 282 if (!(r instanceof Result)) { 283 return; 284 } 285 Result result = (Result) r; 286 RegionLoadStats stats = result.getStats(); 287 if (stats == null) { 288 return; 289 } 290 updateRegionStats(serverName, regionName, stats); 291 } 292 293 @Override 294 public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) { 295 String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName); 296 ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName, 297 () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR)); 298 RegionStats regionStats = 299 computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name)); 300 regionStats.update(stats); 301 } 302 303 /** A lambda for dispatching to the appropriate metric factory method */ 304 private static interface NewMetric<T> { 305 T newMetric(Class<?> clazz, String name, String scope); 306 } 307 308 /** Anticipated number of metric entries */ 309 private static final int CAPACITY = 50; 310 /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */ 311 private static final float LOAD_FACTOR = 0.75f; 312 /** 313 * Anticipated number of concurrent accessor threads 314 */ 315 private static final int CONCURRENCY_LEVEL = 256; 316 317 private final MetricRegistry registry; 318 private final JmxReporter reporter; 319 private final String scope; 320 private final boolean tableMetricsEnabled; 321 322 private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() { 323 @Override 324 public Timer newMetric(Class<?> clazz, String name, String scope) { 325 return registry.timer(name(clazz, name, scope)); 326 } 327 }; 328 329 private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() { 330 @Override 331 public Histogram newMetric(Class<?> clazz, String name, String scope) { 332 return registry.histogram(name(clazz, name, scope)); 333 } 334 }; 335 336 private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() { 337 @Override 338 public Counter newMetric(Class<?> clazz, String name, String scope) { 339 return registry.counter(name(clazz, name, scope)); 340 } 341 }; 342 343 // List of thread pool per connection of the metrics. 344 private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>(); 345 private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>(); 346 347 // static metrics 348 349 private final Counter connectionCount; 350 private final Counter metaCacheHits; 351 private final Counter metaCacheMisses; 352 private final CallTracker getTracker; 353 private final CallTracker scanTracker; 354 private final CallTracker appendTracker; 355 private final CallTracker deleteTracker; 356 private final CallTracker incrementTracker; 357 private final CallTracker putTracker; 358 private final CallTracker multiTracker; 359 private final RunnerStats runnerStats; 360 private final Counter metaCacheNumClearServer; 361 private final Counter metaCacheNumClearRegion; 362 private final Counter hedgedReadOps; 363 private final Counter hedgedReadWin; 364 private final Histogram concurrentCallsPerServerHist; 365 private final Histogram numActionsPerServerHist; 366 private final Counter nsLookups; 367 private final Counter nsLookupsFailed; 368 private final Timer overloadedBackoffTimer; 369 private final Counter userRegionLockTimeoutCount; 370 private final Timer userRegionLockWaitingTimer; 371 private final Timer userRegionLockHeldTimer; 372 private final Histogram userRegionLockQueueHist; 373 374 // dynamic metrics 375 376 // These maps are used to cache references to the metric instances that are managed by the 377 // registry. I don't think their use perfectly removes redundant allocations, but it's 378 // a big improvement over calling registry.newMetric each time. 379 private final ConcurrentMap<String, Timer> rpcTimers = 380 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 381 private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>( 382 CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL); 383 private final ConcurrentMap<String, Counter> cacheDroppingExceptions = 384 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 385 private final ConcurrentMap<String, Counter> rpcCounters = 386 new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL); 387 388 private MetricsConnection(Configuration conf, String scope, 389 Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) { 390 this.scope = scope; 391 this.tableMetricsEnabled = conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false); 392 addThreadPools(batchPool, metaPool); 393 this.registry = new MetricRegistry(); 394 this.registry.register(getExecutorPoolName(), new RatioGauge() { 395 @Override 396 protected Ratio getRatio() { 397 int numerator = 0; 398 int denominator = 0; 399 for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) { 400 ThreadPoolExecutor pool = poolSupplier.get(); 401 if (pool != null) { 402 int activeCount = pool.getActiveCount(); 403 int maxPoolSize = pool.getMaximumPoolSize(); 404 /* The max thread usage ratio among batch pools of all connections */ 405 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 406 numerator = activeCount; 407 denominator = maxPoolSize; 408 } 409 } 410 } 411 return Ratio.of(numerator, denominator); 412 } 413 }); 414 this.registry.register(getMetaPoolName(), new RatioGauge() { 415 @Override 416 protected Ratio getRatio() { 417 int numerator = 0; 418 int denominator = 0; 419 for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) { 420 ThreadPoolExecutor pool = poolSupplier.get(); 421 if (pool != null) { 422 int activeCount = pool.getActiveCount(); 423 int maxPoolSize = pool.getMaximumPoolSize(); 424 /* The max thread usage ratio among meta lookup pools of all connections */ 425 if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) { 426 numerator = activeCount; 427 denominator = maxPoolSize; 428 } 429 } 430 } 431 return Ratio.of(numerator, denominator); 432 } 433 }); 434 this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope)); 435 this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope)); 436 this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope)); 437 this.metaCacheNumClearServer = 438 registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope)); 439 this.metaCacheNumClearRegion = 440 registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope)); 441 this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope)); 442 this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope)); 443 this.getTracker = new CallTracker(this.registry, "Get", scope); 444 this.scanTracker = new CallTracker(this.registry, "Scan", scope); 445 this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope); 446 this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope); 447 this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope); 448 this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope); 449 this.multiTracker = new CallTracker(this.registry, "Multi", scope); 450 this.runnerStats = new RunnerStats(this.registry); 451 this.concurrentCallsPerServerHist = 452 registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope)); 453 this.numActionsPerServerHist = 454 registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope)); 455 this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope)); 456 this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope)); 457 458 this.userRegionLockTimeoutCount = 459 registry.counter(name(this.getClass(), "userRegionLockTimeoutCount", scope)); 460 this.userRegionLockWaitingTimer = 461 registry.timer(name(this.getClass(), "userRegionLockWaitingDuration", scope)); 462 this.userRegionLockHeldTimer = 463 registry.timer(name(this.getClass(), "userRegionLockHeldDuration", scope)); 464 this.userRegionLockQueueHist = 465 registry.histogram(name(MetricsConnection.class, "userRegionLockQueueLength", scope)); 466 467 this.overloadedBackoffTimer = 468 registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope)); 469 470 this.reporter = JmxReporter.forRegistry(this.registry).build(); 471 this.reporter.start(); 472 } 473 474 final String getExecutorPoolName() { 475 return name(getClass(), "executorPoolActiveThreads", scope); 476 } 477 478 final String getMetaPoolName() { 479 return name(getClass(), "metaPoolActiveThreads", scope); 480 } 481 482 MetricRegistry getMetricRegistry() { 483 return registry; 484 } 485 486 /** scope of the metrics object */ 487 public String getMetricScope() { 488 return scope; 489 } 490 491 /** serverStats metric */ 492 public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() { 493 return serverStats; 494 } 495 496 /** runnerStats metric */ 497 public RunnerStats getRunnerStats() { 498 return runnerStats; 499 } 500 501 /** metaCacheNumClearServer metric */ 502 public Counter getMetaCacheNumClearServer() { 503 return metaCacheNumClearServer; 504 } 505 506 /** metaCacheNumClearRegion metric */ 507 public Counter getMetaCacheNumClearRegion() { 508 return metaCacheNumClearRegion; 509 } 510 511 /** hedgedReadOps metric */ 512 public Counter getHedgedReadOps() { 513 return hedgedReadOps; 514 } 515 516 /** hedgedReadWin metric */ 517 public Counter getHedgedReadWin() { 518 return hedgedReadWin; 519 } 520 521 /** numActionsPerServerHist metric */ 522 public Histogram getNumActionsPerServerHist() { 523 return numActionsPerServerHist; 524 } 525 526 /** rpcCounters metric */ 527 public ConcurrentMap<String, Counter> getRpcCounters() { 528 return rpcCounters; 529 } 530 531 /** rpcTimers metric */ 532 public ConcurrentMap<String, Timer> getRpcTimers() { 533 return rpcTimers; 534 } 535 536 /** rpcHistograms metric */ 537 public ConcurrentMap<String, Histogram> getRpcHistograms() { 538 return rpcHistograms; 539 } 540 541 /** getTracker metric */ 542 public CallTracker getGetTracker() { 543 return getTracker; 544 } 545 546 /** scanTracker metric */ 547 public CallTracker getScanTracker() { 548 return scanTracker; 549 } 550 551 /** multiTracker metric */ 552 public CallTracker getMultiTracker() { 553 return multiTracker; 554 } 555 556 /** appendTracker metric */ 557 public CallTracker getAppendTracker() { 558 return appendTracker; 559 } 560 561 /** deleteTracker metric */ 562 public CallTracker getDeleteTracker() { 563 return deleteTracker; 564 } 565 566 /** incrementTracker metric */ 567 public CallTracker getIncrementTracker() { 568 return incrementTracker; 569 } 570 571 /** putTracker metric */ 572 public CallTracker getPutTracker() { 573 return putTracker; 574 } 575 576 /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */ 577 public static CallStats newCallStats() { 578 // TODO: instance pool to reduce GC? 579 return new CallStats(); 580 } 581 582 /** Increment the number of meta cache hits. */ 583 public void incrMetaCacheHit() { 584 metaCacheHits.inc(); 585 } 586 587 /** Increment the number of meta cache misses. */ 588 public void incrMetaCacheMiss() { 589 metaCacheMisses.inc(); 590 } 591 592 public long getMetaCacheMisses() { 593 return metaCacheMisses.getCount(); 594 } 595 596 /** Increment the number of meta cache drops requested for entire RegionServer. */ 597 public void incrMetaCacheNumClearServer() { 598 metaCacheNumClearServer.inc(); 599 } 600 601 /** Increment the number of meta cache drops requested for individual region. */ 602 public void incrMetaCacheNumClearRegion() { 603 metaCacheNumClearRegion.inc(); 604 } 605 606 /** Increment the number of meta cache drops requested for individual region. */ 607 public void incrMetaCacheNumClearRegion(int count) { 608 metaCacheNumClearRegion.inc(count); 609 } 610 611 /** Increment the number of hedged read that have occurred. */ 612 public void incrHedgedReadOps() { 613 hedgedReadOps.inc(); 614 } 615 616 /** Increment the number of hedged read returned faster than the original read. */ 617 public void incrHedgedReadWin() { 618 hedgedReadWin.inc(); 619 } 620 621 /** Increment the number of normal runner counts. */ 622 public void incrNormalRunners() { 623 this.runnerStats.incrNormalRunners(); 624 } 625 626 /** Increment the number of delay runner counts and update delay interval of delay runner. */ 627 public void incrDelayRunnersAndUpdateDelayInterval(long interval) { 628 this.runnerStats.incrDelayRunners(); 629 this.runnerStats.updateDelayInterval(interval); 630 } 631 632 public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) { 633 overloadedBackoffTimer.update(time, timeUnit); 634 } 635 636 /** incr */ 637 public void incrUserRegionLockTimeout() { 638 userRegionLockTimeoutCount.inc(); 639 } 640 641 /** get */ 642 public Counter getUserRegionLockTimeout() { 643 return userRegionLockTimeoutCount; 644 } 645 646 public Timer getUserRegionLockWaitingTimer() { 647 return userRegionLockWaitingTimer; 648 } 649 650 public Timer getUserRegionLockHeldTimer() { 651 return userRegionLockHeldTimer; 652 } 653 654 public Histogram getUserRegionLockQueue() { 655 return userRegionLockQueueHist; 656 } 657 658 /** update */ 659 public void updateUserRegionLockWaiting(long duration) { 660 userRegionLockWaitingTimer.update(duration, TimeUnit.MILLISECONDS); 661 } 662 663 public void updateUserRegionLockHeld(long duration) { 664 userRegionLockHeldTimer.update(duration, TimeUnit.MILLISECONDS); 665 } 666 667 public void updateUserRegionLockQueue(int count) { 668 userRegionLockQueueHist.update(count); 669 } 670 671 /** Return the connection count of the metrics within a scope */ 672 public long getConnectionCount() { 673 return connectionCount.getCount(); 674 } 675 676 /** Increment the connection count of the metrics within a scope */ 677 private void incrConnectionCount() { 678 connectionCount.inc(); 679 } 680 681 /** Decrement the connection count of the metrics within a scope */ 682 private void decrConnectionCount() { 683 connectionCount.dec(); 684 } 685 686 /** Add thread pools of additional connections to the metrics */ 687 private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool, 688 Supplier<ThreadPoolExecutor> metaPool) { 689 batchPools.add(batchPool); 690 metaPools.add(metaPool); 691 } 692 693 /** 694 * Get a metric for {@code key} from {@code map}, or create it with {@code factory}. 695 */ 696 private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) { 697 return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope)); 698 } 699 700 /** Update call stats for non-critical-path methods */ 701 private void updateRpcGeneric(String methodName, CallStats stats) { 702 getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(), 703 TimeUnit.MILLISECONDS); 704 getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory) 705 .update(stats.getRequestSizeBytes()); 706 getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory) 707 .update(stats.getResponseSizeBytes()); 708 } 709 710 private void shutdown() { 711 this.reporter.stop(); 712 } 713 714 /** Report RPC context to metrics system. */ 715 public void updateRpc(MethodDescriptor method, TableName tableName, Message param, 716 CallStats stats, Throwable e) { 717 int callsPerServer = stats.getConcurrentCallsPerServer(); 718 if (callsPerServer > 0) { 719 concurrentCallsPerServerHist.update(callsPerServer); 720 } 721 // Update the counter that tracks RPCs by type. 722 StringBuilder methodName = new StringBuilder(); 723 methodName.append(method.getService().getName()).append("_").append(method.getName()); 724 // Distinguish mutate types. 725 if ("Mutate".equals(method.getName())) { 726 final MutationType type = ((MutateRequest) param).getMutation().getMutateType(); 727 switch (type) { 728 case APPEND: 729 methodName.append("(Append)"); 730 break; 731 case DELETE: 732 methodName.append("(Delete)"); 733 break; 734 case INCREMENT: 735 methodName.append("(Increment)"); 736 break; 737 case PUT: 738 methodName.append("(Put)"); 739 break; 740 default: 741 methodName.append("(Unknown)"); 742 } 743 } 744 getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 745 if (e != null) { 746 getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc(); 747 getMetric(TOTAL_EXCEPTION_CNT, rpcCounters, counterFactory).inc(); 748 if (e instanceof RemoteException) { 749 String fullClassName = ((RemoteException) e).getClassName(); 750 String simpleClassName = (fullClassName != null) 751 ? fullClassName.substring(fullClassName.lastIndexOf(".") + 1) 752 : "unknown"; 753 getMetric(REMOTE_EXCEPTION_CNT_BASE + simpleClassName, rpcCounters, counterFactory).inc(); 754 } else { 755 getMetric(LOCAL_EXCEPTION_CNT_BASE + e.getClass().getSimpleName(), rpcCounters, 756 counterFactory).inc(); 757 } 758 } 759 // this implementation is tied directly to protobuf implementation details. would be better 760 // if we could dispatch based on something static, ie, request Message type. 761 if (method.getService() == ClientService.getDescriptor()) { 762 switch (method.getIndex()) { 763 case 0: 764 assert "Get".equals(method.getName()); 765 getTracker.updateRpc(stats); 766 updateTableMetric(methodName.toString(), tableName, stats, e); 767 return; 768 case 1: 769 assert "Mutate".equals(method.getName()); 770 final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType(); 771 switch (mutationType) { 772 case APPEND: 773 appendTracker.updateRpc(stats); 774 break; 775 case DELETE: 776 deleteTracker.updateRpc(stats); 777 break; 778 case INCREMENT: 779 incrementTracker.updateRpc(stats); 780 break; 781 case PUT: 782 putTracker.updateRpc(stats); 783 break; 784 default: 785 throw new RuntimeException("Unrecognized mutation type " + mutationType); 786 } 787 updateTableMetric(methodName.toString(), tableName, stats, e); 788 return; 789 case 2: 790 assert "Scan".equals(method.getName()); 791 scanTracker.updateRpc(stats); 792 updateTableMetric(methodName.toString(), tableName, stats, e); 793 return; 794 case 3: 795 assert "BulkLoadHFile".equals(method.getName()); 796 // use generic implementation 797 break; 798 case 4: 799 assert "PrepareBulkLoad".equals(method.getName()); 800 // use generic implementation 801 break; 802 case 5: 803 assert "CleanupBulkLoad".equals(method.getName()); 804 // use generic implementation 805 break; 806 case 6: 807 assert "ExecService".equals(method.getName()); 808 // use generic implementation 809 break; 810 case 7: 811 assert "ExecRegionServerService".equals(method.getName()); 812 // use generic implementation 813 break; 814 case 8: 815 assert "Multi".equals(method.getName()); 816 numActionsPerServerHist.update(stats.getNumActionsPerServer()); 817 multiTracker.updateRpc(stats); 818 updateTableMetric(methodName.toString(), tableName, stats, e); 819 return; 820 default: 821 throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName()); 822 } 823 } 824 // Fallback to dynamic registry lookup for DDL methods. 825 updateRpcGeneric(methodName.toString(), stats); 826 } 827 828 /** Report table rpc context to metrics system. */ 829 private void updateTableMetric(String methodName, TableName tableName, CallStats stats, 830 Throwable e) { 831 if (tableMetricsEnabled) { 832 if (methodName != null) { 833 String table = tableName != null && StringUtils.isNotEmpty(tableName.getNameAsString()) 834 ? tableName.getNameAsString() 835 : "unknown"; 836 String metricKey = methodName + "_" + table; 837 // update table rpc context to metrics system, 838 // includes rpc call duration, rpc call request/response size(bytes). 839 updateRpcGeneric(metricKey, stats); 840 if (e != null) { 841 // rpc failure call counter with table name. 842 getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc(); 843 } 844 } 845 } 846 } 847 848 public void incrCacheDroppingExceptions(Object exception) { 849 getMetric( 850 CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()), 851 cacheDroppingExceptions, counterFactory).inc(); 852 } 853 854 public void incrNsLookups() { 855 this.nsLookups.inc(); 856 } 857 858 public void incrNsLookupsFailed() { 859 this.nsLookupsFailed.inc(); 860 } 861}