001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static com.codahale.metrics.MetricRegistry.name;
021import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.Histogram;
025import com.codahale.metrics.JmxReporter;
026import com.codahale.metrics.MetricRegistry;
027import com.codahale.metrics.RatioGauge;
028import com.codahale.metrics.Timer;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.function.Supplier;
037import org.apache.commons.lang3.StringUtils;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hadoop.ipc.RemoteException;
043import org.apache.yetus.audience.InterfaceAudience;
044
045import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
046import org.apache.hbase.thirdparty.com.google.protobuf.Message;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
051
052/**
053 * This class is for maintaining the various connection statistics and publishing them through the
054 * metrics interfaces. This class manages its own {@link MetricRegistry} and {@link JmxReporter} so
055 * as to not conflict with other uses of Yammer Metrics within the client application. Calling
056 * {@link #getMetricsConnection(Configuration, String, Supplier, Supplier)} implicitly creates and
057 * "starts" instances of these classes; be sure to call {@link #deleteMetricsConnection(String)} to
058 * terminate the thread pools they allocate. The metrics reporter will be shutdown
059 * {@link #shutdown()} when all connections within this metrics instances are closed.
060 */
061@InterfaceAudience.Private
062public final class MetricsConnection implements StatisticTrackable {
063
064  private static final ConcurrentMap<String, MetricsConnection> METRICS_INSTANCES =
065    new ConcurrentHashMap<>();
066
067  static MetricsConnection getMetricsConnection(final Configuration conf, final String scope,
068    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
069    return METRICS_INSTANCES.compute(scope, (s, metricsConnection) -> {
070      if (metricsConnection == null) {
071        MetricsConnection newMetricsConn = new MetricsConnection(conf, scope, batchPool, metaPool);
072        newMetricsConn.incrConnectionCount();
073        return newMetricsConn;
074      } else {
075        metricsConnection.addThreadPools(batchPool, metaPool);
076        metricsConnection.incrConnectionCount();
077        return metricsConnection;
078      }
079    });
080  }
081
082  static void deleteMetricsConnection(final String scope) {
083    METRICS_INSTANCES.computeIfPresent(scope, (s, metricsConnection) -> {
084      metricsConnection.decrConnectionCount();
085      if (metricsConnection.getConnectionCount() == 0) {
086        metricsConnection.shutdown();
087        return null;
088      }
089      return metricsConnection;
090    });
091  }
092
093  /** Set this key to {@code true} to enable metrics collection of client requests. */
094  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
095
096  /** Set this key to {@code true} to enable table metrics collection of client requests. */
097  public static final String CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY =
098    "hbase.client.table.metrics.enable";
099
100  /**
101   * Set to specify a custom scope for the metrics published through {@link MetricsConnection}. The
102   * scope is added to JMX MBean objectName, and defaults to a combination of the Connection's
103   * clusterId and hashCode. For example, a default value for a connection to cluster "foo" might be
104   * "foo-7d9d0818", where "7d9d0818" is the hashCode of the underlying AsyncConnectionImpl. Users
105   * may set this key to give a more contextual name for this scope. For example, one might want to
106   * differentiate a read connection from a write connection by setting the scopes to "foo-read" and
107   * "foo-write" respectively. Scope is the only thing that lends any uniqueness to the metrics.
108   * Care should be taken to avoid using the same scope for multiple Connections, otherwise the
109   * metrics may aggregate in unforeseen ways.
110   */
111  public static final String METRICS_SCOPE_KEY = "hbase.client.metrics.scope";
112
113  /**
114   * Returns the scope for a MetricsConnection based on the configured {@link #METRICS_SCOPE_KEY} or
115   * by generating a default from the passed clusterId and connectionObj's hashCode.
116   * @param conf          configuration for the connection
117   * @param clusterId     clusterId for the connection
118   * @param connectionObj either a Connection or AsyncConnectionImpl, the instance creating this
119   *                      MetricsConnection.
120   */
121  static String getScope(Configuration conf, String clusterId, Object connectionObj) {
122    return conf.get(METRICS_SCOPE_KEY,
123      clusterId + "@" + Integer.toHexString(connectionObj.hashCode()));
124  }
125
126  private static final String CNT_BASE = "rpcCount_";
127  private static final String FAILURE_CNT_BASE = "rpcFailureCount_";
128  private static final String TOTAL_EXCEPTION_CNT = "rpcTotalExceptions";
129  private static final String LOCAL_EXCEPTION_CNT_BASE = "rpcLocalExceptions_";
130  private static final String REMOTE_EXCEPTION_CNT_BASE = "rpcRemoteExceptions_";
131  private static final String DRTN_BASE = "rpcCallDurationMs_";
132  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
133  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
134  private static final String MEMLOAD_BASE = "memstoreLoad_";
135  private static final String HEAP_BASE = "heapOccupancy_";
136  private static final String CACHE_BASE = "cacheDroppingExceptions_";
137  private static final String UNKNOWN_EXCEPTION = "UnknownException";
138  private static final String NS_LOOKUPS = "nsLookups";
139  private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
140  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
141
142  /** A container class for collecting details about the RPC call as it percolates. */
143  public static class CallStats {
144    private long requestSizeBytes = 0;
145    private long responseSizeBytes = 0;
146    private long startTime = 0;
147    private long callTimeMs = 0;
148    private int concurrentCallsPerServer = 0;
149    private int numActionsPerServer = 0;
150
151    public long getRequestSizeBytes() {
152      return requestSizeBytes;
153    }
154
155    public void setRequestSizeBytes(long requestSizeBytes) {
156      this.requestSizeBytes = requestSizeBytes;
157    }
158
159    public long getResponseSizeBytes() {
160      return responseSizeBytes;
161    }
162
163    public void setResponseSizeBytes(long responseSizeBytes) {
164      this.responseSizeBytes = responseSizeBytes;
165    }
166
167    public long getStartTime() {
168      return startTime;
169    }
170
171    public void setStartTime(long startTime) {
172      this.startTime = startTime;
173    }
174
175    public long getCallTimeMs() {
176      return callTimeMs;
177    }
178
179    public void setCallTimeMs(long callTimeMs) {
180      this.callTimeMs = callTimeMs;
181    }
182
183    public int getConcurrentCallsPerServer() {
184      return concurrentCallsPerServer;
185    }
186
187    public void setConcurrentCallsPerServer(int callsPerServer) {
188      this.concurrentCallsPerServer = callsPerServer;
189    }
190
191    public int getNumActionsPerServer() {
192      return numActionsPerServer;
193    }
194
195    public void setNumActionsPerServer(int numActionsPerServer) {
196      this.numActionsPerServer = numActionsPerServer;
197    }
198  }
199
200  protected static final class CallTracker {
201    private final String name;
202    final Timer callTimer;
203    final Histogram reqHist;
204    final Histogram respHist;
205
206    private CallTracker(MetricRegistry registry, String name, String subName, String scope) {
207      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
208      if (subName != null) {
209        sb.append("(").append(subName).append(")");
210      }
211      this.name = sb.toString();
212      this.callTimer = registry.timer(name(MetricsConnection.class, DRTN_BASE + this.name, scope));
213      this.reqHist = registry.histogram(name(MetricsConnection.class, REQ_BASE + this.name, scope));
214      this.respHist =
215        registry.histogram(name(MetricsConnection.class, RESP_BASE + this.name, scope));
216    }
217
218    private CallTracker(MetricRegistry registry, String name, String scope) {
219      this(registry, name, null, scope);
220    }
221
222    public void updateRpc(CallStats stats) {
223      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
224      this.reqHist.update(stats.getRequestSizeBytes());
225      this.respHist.update(stats.getResponseSizeBytes());
226    }
227
228    @Override
229    public String toString() {
230      return "CallTracker:" + name;
231    }
232  }
233
234  protected static class RegionStats {
235    final String name;
236    final Histogram memstoreLoadHist;
237    final Histogram heapOccupancyHist;
238
239    public RegionStats(MetricRegistry registry, String name) {
240      this.name = name;
241      this.memstoreLoadHist =
242        registry.histogram(name(MetricsConnection.class, MEMLOAD_BASE + this.name));
243      this.heapOccupancyHist =
244        registry.histogram(name(MetricsConnection.class, HEAP_BASE + this.name));
245    }
246
247    public void update(RegionLoadStats regionStatistics) {
248      this.memstoreLoadHist.update(regionStatistics.getMemStoreLoad());
249      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
250    }
251  }
252
253  protected static class RunnerStats {
254    final Counter normalRunners;
255    final Counter delayRunners;
256    final Histogram delayIntevalHist;
257
258    public RunnerStats(MetricRegistry registry) {
259      this.normalRunners = registry.counter(name(MetricsConnection.class, "normalRunnersCount"));
260      this.delayRunners = registry.counter(name(MetricsConnection.class, "delayRunnersCount"));
261      this.delayIntevalHist =
262        registry.histogram(name(MetricsConnection.class, "delayIntervalHist"));
263    }
264
265    public void incrNormalRunners() {
266      this.normalRunners.inc();
267    }
268
269    public void incrDelayRunners() {
270      this.delayRunners.inc();
271    }
272
273    public void updateDelayInterval(long interval) {
274      this.delayIntevalHist.update(interval);
275    }
276  }
277
278  private ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> serverStats =
279    new ConcurrentHashMap<>();
280
281  public void updateServerStats(ServerName serverName, byte[] regionName, Object r) {
282    if (!(r instanceof Result)) {
283      return;
284    }
285    Result result = (Result) r;
286    RegionLoadStats stats = result.getStats();
287    if (stats == null) {
288      return;
289    }
290    updateRegionStats(serverName, regionName, stats);
291  }
292
293  @Override
294  public void updateRegionStats(ServerName serverName, byte[] regionName, RegionLoadStats stats) {
295    String name = serverName.getServerName() + "," + Bytes.toStringBinary(regionName);
296    ConcurrentMap<byte[], RegionStats> rsStats = computeIfAbsent(serverStats, serverName,
297      () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
298    RegionStats regionStats =
299      computeIfAbsent(rsStats, regionName, () -> new RegionStats(this.registry, name));
300    regionStats.update(stats);
301  }
302
303  /** A lambda for dispatching to the appropriate metric factory method */
304  private static interface NewMetric<T> {
305    T newMetric(Class<?> clazz, String name, String scope);
306  }
307
308  /** Anticipated number of metric entries */
309  private static final int CAPACITY = 50;
310  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
311  private static final float LOAD_FACTOR = 0.75f;
312  /**
313   * Anticipated number of concurrent accessor threads
314   */
315  private static final int CONCURRENCY_LEVEL = 256;
316
317  private final MetricRegistry registry;
318  private final JmxReporter reporter;
319  private final String scope;
320  private final boolean tableMetricsEnabled;
321
322  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
323    @Override
324    public Timer newMetric(Class<?> clazz, String name, String scope) {
325      return registry.timer(name(clazz, name, scope));
326    }
327  };
328
329  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
330    @Override
331    public Histogram newMetric(Class<?> clazz, String name, String scope) {
332      return registry.histogram(name(clazz, name, scope));
333    }
334  };
335
336  private final NewMetric<Counter> counterFactory = new NewMetric<Counter>() {
337    @Override
338    public Counter newMetric(Class<?> clazz, String name, String scope) {
339      return registry.counter(name(clazz, name, scope));
340    }
341  };
342
343  // List of thread pool per connection of the metrics.
344  private final List<Supplier<ThreadPoolExecutor>> batchPools = new ArrayList<>();
345  private final List<Supplier<ThreadPoolExecutor>> metaPools = new ArrayList<>();
346
347  // static metrics
348
349  private final Counter connectionCount;
350  private final Counter metaCacheHits;
351  private final Counter metaCacheMisses;
352  private final CallTracker getTracker;
353  private final CallTracker scanTracker;
354  private final CallTracker appendTracker;
355  private final CallTracker deleteTracker;
356  private final CallTracker incrementTracker;
357  private final CallTracker putTracker;
358  private final CallTracker multiTracker;
359  private final RunnerStats runnerStats;
360  private final Counter metaCacheNumClearServer;
361  private final Counter metaCacheNumClearRegion;
362  private final Counter hedgedReadOps;
363  private final Counter hedgedReadWin;
364  private final Histogram concurrentCallsPerServerHist;
365  private final Histogram numActionsPerServerHist;
366  private final Counter nsLookups;
367  private final Counter nsLookupsFailed;
368  private final Timer overloadedBackoffTimer;
369  private final Counter userRegionLockTimeoutCount;
370  private final Timer userRegionLockWaitingTimer;
371  private final Timer userRegionLockHeldTimer;
372  private final Histogram userRegionLockQueueHist;
373
374  // dynamic metrics
375
376  // These maps are used to cache references to the metric instances that are managed by the
377  // registry. I don't think their use perfectly removes redundant allocations, but it's
378  // a big improvement over calling registry.newMetric each time.
379  private final ConcurrentMap<String, Timer> rpcTimers =
380    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
381  private final ConcurrentMap<String, Histogram> rpcHistograms = new ConcurrentHashMap<>(
382    CAPACITY * 2 /* tracking both request and response sizes */, LOAD_FACTOR, CONCURRENCY_LEVEL);
383  private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
384    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
385  private final ConcurrentMap<String, Counter> rpcCounters =
386    new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
387
388  private MetricsConnection(Configuration conf, String scope,
389    Supplier<ThreadPoolExecutor> batchPool, Supplier<ThreadPoolExecutor> metaPool) {
390    this.scope = scope;
391    this.tableMetricsEnabled = conf.getBoolean(CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, false);
392    addThreadPools(batchPool, metaPool);
393    this.registry = new MetricRegistry();
394    this.registry.register(getExecutorPoolName(), new RatioGauge() {
395      @Override
396      protected Ratio getRatio() {
397        int numerator = 0;
398        int denominator = 0;
399        for (Supplier<ThreadPoolExecutor> poolSupplier : batchPools) {
400          ThreadPoolExecutor pool = poolSupplier.get();
401          if (pool != null) {
402            int activeCount = pool.getActiveCount();
403            int maxPoolSize = pool.getMaximumPoolSize();
404            /* The max thread usage ratio among batch pools of all connections */
405            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
406              numerator = activeCount;
407              denominator = maxPoolSize;
408            }
409          }
410        }
411        return Ratio.of(numerator, denominator);
412      }
413    });
414    this.registry.register(getMetaPoolName(), new RatioGauge() {
415      @Override
416      protected Ratio getRatio() {
417        int numerator = 0;
418        int denominator = 0;
419        for (Supplier<ThreadPoolExecutor> poolSupplier : metaPools) {
420          ThreadPoolExecutor pool = poolSupplier.get();
421          if (pool != null) {
422            int activeCount = pool.getActiveCount();
423            int maxPoolSize = pool.getMaximumPoolSize();
424            /* The max thread usage ratio among meta lookup pools of all connections */
425            if (numerator == 0 || (numerator * maxPoolSize) < (activeCount * denominator)) {
426              numerator = activeCount;
427              denominator = maxPoolSize;
428            }
429          }
430        }
431        return Ratio.of(numerator, denominator);
432      }
433    });
434    this.connectionCount = registry.counter(name(this.getClass(), "connectionCount", scope));
435    this.metaCacheHits = registry.counter(name(this.getClass(), "metaCacheHits", scope));
436    this.metaCacheMisses = registry.counter(name(this.getClass(), "metaCacheMisses", scope));
437    this.metaCacheNumClearServer =
438      registry.counter(name(this.getClass(), "metaCacheNumClearServer", scope));
439    this.metaCacheNumClearRegion =
440      registry.counter(name(this.getClass(), "metaCacheNumClearRegion", scope));
441    this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
442    this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
443    this.getTracker = new CallTracker(this.registry, "Get", scope);
444    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
445    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
446    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
447    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
448    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
449    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
450    this.runnerStats = new RunnerStats(this.registry);
451    this.concurrentCallsPerServerHist =
452      registry.histogram(name(MetricsConnection.class, "concurrentCallsPerServer", scope));
453    this.numActionsPerServerHist =
454      registry.histogram(name(MetricsConnection.class, "numActionsPerServer", scope));
455    this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
456    this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));
457
458    this.userRegionLockTimeoutCount =
459      registry.counter(name(this.getClass(), "userRegionLockTimeoutCount", scope));
460    this.userRegionLockWaitingTimer =
461      registry.timer(name(this.getClass(), "userRegionLockWaitingDuration", scope));
462    this.userRegionLockHeldTimer =
463      registry.timer(name(this.getClass(), "userRegionLockHeldDuration", scope));
464    this.userRegionLockQueueHist =
465      registry.histogram(name(MetricsConnection.class, "userRegionLockQueueLength", scope));
466
467    this.overloadedBackoffTimer =
468      registry.timer(name(this.getClass(), "overloadedBackoffDurationMs", scope));
469
470    this.reporter = JmxReporter.forRegistry(this.registry).build();
471    this.reporter.start();
472  }
473
474  final String getExecutorPoolName() {
475    return name(getClass(), "executorPoolActiveThreads", scope);
476  }
477
478  final String getMetaPoolName() {
479    return name(getClass(), "metaPoolActiveThreads", scope);
480  }
481
482  MetricRegistry getMetricRegistry() {
483    return registry;
484  }
485
486  /** scope of the metrics object */
487  public String getMetricScope() {
488    return scope;
489  }
490
491  /** serverStats metric */
492  public ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> getServerStats() {
493    return serverStats;
494  }
495
496  /** runnerStats metric */
497  public RunnerStats getRunnerStats() {
498    return runnerStats;
499  }
500
501  /** metaCacheNumClearServer metric */
502  public Counter getMetaCacheNumClearServer() {
503    return metaCacheNumClearServer;
504  }
505
506  /** metaCacheNumClearRegion metric */
507  public Counter getMetaCacheNumClearRegion() {
508    return metaCacheNumClearRegion;
509  }
510
511  /** hedgedReadOps metric */
512  public Counter getHedgedReadOps() {
513    return hedgedReadOps;
514  }
515
516  /** hedgedReadWin metric */
517  public Counter getHedgedReadWin() {
518    return hedgedReadWin;
519  }
520
521  /** numActionsPerServerHist metric */
522  public Histogram getNumActionsPerServerHist() {
523    return numActionsPerServerHist;
524  }
525
526  /** rpcCounters metric */
527  public ConcurrentMap<String, Counter> getRpcCounters() {
528    return rpcCounters;
529  }
530
531  /** rpcTimers metric */
532  public ConcurrentMap<String, Timer> getRpcTimers() {
533    return rpcTimers;
534  }
535
536  /** rpcHistograms metric */
537  public ConcurrentMap<String, Histogram> getRpcHistograms() {
538    return rpcHistograms;
539  }
540
541  /** getTracker metric */
542  public CallTracker getGetTracker() {
543    return getTracker;
544  }
545
546  /** scanTracker metric */
547  public CallTracker getScanTracker() {
548    return scanTracker;
549  }
550
551  /** multiTracker metric */
552  public CallTracker getMultiTracker() {
553    return multiTracker;
554  }
555
556  /** appendTracker metric */
557  public CallTracker getAppendTracker() {
558    return appendTracker;
559  }
560
561  /** deleteTracker metric */
562  public CallTracker getDeleteTracker() {
563    return deleteTracker;
564  }
565
566  /** incrementTracker metric */
567  public CallTracker getIncrementTracker() {
568    return incrementTracker;
569  }
570
571  /** putTracker metric */
572  public CallTracker getPutTracker() {
573    return putTracker;
574  }
575
576  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
577  public static CallStats newCallStats() {
578    // TODO: instance pool to reduce GC?
579    return new CallStats();
580  }
581
582  /** Increment the number of meta cache hits. */
583  public void incrMetaCacheHit() {
584    metaCacheHits.inc();
585  }
586
587  /** Increment the number of meta cache misses. */
588  public void incrMetaCacheMiss() {
589    metaCacheMisses.inc();
590  }
591
592  public long getMetaCacheMisses() {
593    return metaCacheMisses.getCount();
594  }
595
596  /** Increment the number of meta cache drops requested for entire RegionServer. */
597  public void incrMetaCacheNumClearServer() {
598    metaCacheNumClearServer.inc();
599  }
600
601  /** Increment the number of meta cache drops requested for individual region. */
602  public void incrMetaCacheNumClearRegion() {
603    metaCacheNumClearRegion.inc();
604  }
605
606  /** Increment the number of meta cache drops requested for individual region. */
607  public void incrMetaCacheNumClearRegion(int count) {
608    metaCacheNumClearRegion.inc(count);
609  }
610
611  /** Increment the number of hedged read that have occurred. */
612  public void incrHedgedReadOps() {
613    hedgedReadOps.inc();
614  }
615
616  /** Increment the number of hedged read returned faster than the original read. */
617  public void incrHedgedReadWin() {
618    hedgedReadWin.inc();
619  }
620
621  /** Increment the number of normal runner counts. */
622  public void incrNormalRunners() {
623    this.runnerStats.incrNormalRunners();
624  }
625
626  /** Increment the number of delay runner counts and update delay interval of delay runner. */
627  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
628    this.runnerStats.incrDelayRunners();
629    this.runnerStats.updateDelayInterval(interval);
630  }
631
632  public void incrementServerOverloadedBackoffTime(long time, TimeUnit timeUnit) {
633    overloadedBackoffTimer.update(time, timeUnit);
634  }
635
636  /** incr */
637  public void incrUserRegionLockTimeout() {
638    userRegionLockTimeoutCount.inc();
639  }
640
641  /** get */
642  public Counter getUserRegionLockTimeout() {
643    return userRegionLockTimeoutCount;
644  }
645
646  public Timer getUserRegionLockWaitingTimer() {
647    return userRegionLockWaitingTimer;
648  }
649
650  public Timer getUserRegionLockHeldTimer() {
651    return userRegionLockHeldTimer;
652  }
653
654  public Histogram getUserRegionLockQueue() {
655    return userRegionLockQueueHist;
656  }
657
658  /** update */
659  public void updateUserRegionLockWaiting(long duration) {
660    userRegionLockWaitingTimer.update(duration, TimeUnit.MILLISECONDS);
661  }
662
663  public void updateUserRegionLockHeld(long duration) {
664    userRegionLockHeldTimer.update(duration, TimeUnit.MILLISECONDS);
665  }
666
667  public void updateUserRegionLockQueue(int count) {
668    userRegionLockQueueHist.update(count);
669  }
670
671  /** Return the connection count of the metrics within a scope */
672  public long getConnectionCount() {
673    return connectionCount.getCount();
674  }
675
676  /** Increment the connection count of the metrics within a scope */
677  private void incrConnectionCount() {
678    connectionCount.inc();
679  }
680
681  /** Decrement the connection count of the metrics within a scope */
682  private void decrConnectionCount() {
683    connectionCount.dec();
684  }
685
686  /** Add thread pools of additional connections to the metrics */
687  private void addThreadPools(Supplier<ThreadPoolExecutor> batchPool,
688    Supplier<ThreadPoolExecutor> metaPool) {
689    batchPools.add(batchPool);
690    metaPools.add(metaPool);
691  }
692
693  /**
694   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
695   */
696  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
697    return computeIfAbsent(map, key, () -> factory.newMetric(getClass(), key, scope));
698  }
699
700  /** Update call stats for non-critical-path methods */
701  private void updateRpcGeneric(String methodName, CallStats stats) {
702    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory).update(stats.getCallTimeMs(),
703      TimeUnit.MILLISECONDS);
704    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
705      .update(stats.getRequestSizeBytes());
706    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
707      .update(stats.getResponseSizeBytes());
708  }
709
710  private void shutdown() {
711    this.reporter.stop();
712  }
713
714  /** Report RPC context to metrics system. */
715  public void updateRpc(MethodDescriptor method, TableName tableName, Message param,
716    CallStats stats, Throwable e) {
717    int callsPerServer = stats.getConcurrentCallsPerServer();
718    if (callsPerServer > 0) {
719      concurrentCallsPerServerHist.update(callsPerServer);
720    }
721    // Update the counter that tracks RPCs by type.
722    StringBuilder methodName = new StringBuilder();
723    methodName.append(method.getService().getName()).append("_").append(method.getName());
724    // Distinguish mutate types.
725    if ("Mutate".equals(method.getName())) {
726      final MutationType type = ((MutateRequest) param).getMutation().getMutateType();
727      switch (type) {
728        case APPEND:
729          methodName.append("(Append)");
730          break;
731        case DELETE:
732          methodName.append("(Delete)");
733          break;
734        case INCREMENT:
735          methodName.append("(Increment)");
736          break;
737        case PUT:
738          methodName.append("(Put)");
739          break;
740        default:
741          methodName.append("(Unknown)");
742      }
743    }
744    getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
745    if (e != null) {
746      getMetric(FAILURE_CNT_BASE + methodName, rpcCounters, counterFactory).inc();
747      getMetric(TOTAL_EXCEPTION_CNT, rpcCounters, counterFactory).inc();
748      if (e instanceof RemoteException) {
749        String fullClassName = ((RemoteException) e).getClassName();
750        String simpleClassName = (fullClassName != null)
751          ? fullClassName.substring(fullClassName.lastIndexOf(".") + 1)
752          : "unknown";
753        getMetric(REMOTE_EXCEPTION_CNT_BASE + simpleClassName, rpcCounters, counterFactory).inc();
754      } else {
755        getMetric(LOCAL_EXCEPTION_CNT_BASE + e.getClass().getSimpleName(), rpcCounters,
756          counterFactory).inc();
757      }
758    }
759    // this implementation is tied directly to protobuf implementation details. would be better
760    // if we could dispatch based on something static, ie, request Message type.
761    if (method.getService() == ClientService.getDescriptor()) {
762      switch (method.getIndex()) {
763        case 0:
764          assert "Get".equals(method.getName());
765          getTracker.updateRpc(stats);
766          updateTableMetric(methodName.toString(), tableName, stats, e);
767          return;
768        case 1:
769          assert "Mutate".equals(method.getName());
770          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
771          switch (mutationType) {
772            case APPEND:
773              appendTracker.updateRpc(stats);
774              break;
775            case DELETE:
776              deleteTracker.updateRpc(stats);
777              break;
778            case INCREMENT:
779              incrementTracker.updateRpc(stats);
780              break;
781            case PUT:
782              putTracker.updateRpc(stats);
783              break;
784            default:
785              throw new RuntimeException("Unrecognized mutation type " + mutationType);
786          }
787          updateTableMetric(methodName.toString(), tableName, stats, e);
788          return;
789        case 2:
790          assert "Scan".equals(method.getName());
791          scanTracker.updateRpc(stats);
792          updateTableMetric(methodName.toString(), tableName, stats, e);
793          return;
794        case 3:
795          assert "BulkLoadHFile".equals(method.getName());
796          // use generic implementation
797          break;
798        case 4:
799          assert "PrepareBulkLoad".equals(method.getName());
800          // use generic implementation
801          break;
802        case 5:
803          assert "CleanupBulkLoad".equals(method.getName());
804          // use generic implementation
805          break;
806        case 6:
807          assert "ExecService".equals(method.getName());
808          // use generic implementation
809          break;
810        case 7:
811          assert "ExecRegionServerService".equals(method.getName());
812          // use generic implementation
813          break;
814        case 8:
815          assert "Multi".equals(method.getName());
816          numActionsPerServerHist.update(stats.getNumActionsPerServer());
817          multiTracker.updateRpc(stats);
818          updateTableMetric(methodName.toString(), tableName, stats, e);
819          return;
820        default:
821          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
822      }
823    }
824    // Fallback to dynamic registry lookup for DDL methods.
825    updateRpcGeneric(methodName.toString(), stats);
826  }
827
828  /** Report table rpc context to metrics system. */
829  private void updateTableMetric(String methodName, TableName tableName, CallStats stats,
830    Throwable e) {
831    if (tableMetricsEnabled) {
832      if (methodName != null) {
833        String table = tableName != null && StringUtils.isNotEmpty(tableName.getNameAsString())
834          ? tableName.getNameAsString()
835          : "unknown";
836        String metricKey = methodName + "_" + table;
837        // update table rpc context to metrics system,
838        // includes rpc call duration, rpc call request/response size(bytes).
839        updateRpcGeneric(metricKey, stats);
840        if (e != null) {
841          // rpc failure call counter with table name.
842          getMetric(FAILURE_CNT_BASE + metricKey, rpcCounters, counterFactory).inc();
843        }
844      }
845    }
846  }
847
848  public void incrCacheDroppingExceptions(Object exception) {
849    getMetric(
850      CACHE_BASE + (exception == null ? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
851      cacheDroppingExceptions, counterFactory).inc();
852  }
853
854  public void incrNsLookups() {
855    this.nsLookups.inc();
856  }
857
858  public void incrNsLookupsFailed() {
859    this.nsLookupsFailed.inc();
860  }
861}