001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import com.codahale.metrics.Counter;
025import com.codahale.metrics.RatioGauge;
026import com.codahale.metrics.RatioGauge.Ratio;
027import com.codahale.metrics.Timer;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.List;
032import java.util.Optional;
033import java.util.concurrent.Executors;
034import java.util.concurrent.ThreadPoolExecutor;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.ipc.CallTimeoutException;
039import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MetricsTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.After;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.runner.RunWith;
051import org.junit.runners.Parameterized;
052import org.junit.runners.Parameterized.Parameter;
053import org.junit.runners.Parameterized.Parameters;
054
055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
056
057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
067
068@RunWith(Parameterized.class)
069@Category({ ClientTests.class, MetricsTests.class, SmallTests.class })
070public class TestMetricsConnection {
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestMetricsConnection.class);
074
075  private static final Configuration conf = new Configuration();
076  private static MetricsConnection METRICS;
077  private static final ThreadPoolExecutor BATCH_POOL =
078    (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
079
080  private static final String MOCK_CONN_STR = "mocked-connection";
081
082  @Parameter()
083  public boolean tableMetricsEnabled;
084
085  @Parameters
086  public static List<Boolean> params() {
087    return Arrays.asList(false, true);
088  }
089
090  @Before
091  public void before() {
092    conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled);
093    METRICS =
094      MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
095  }
096
097  @After
098  public void after() {
099    MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
100  }
101
102  @Test
103  public void testMetricsConnectionScope() throws IOException {
104    Configuration conf = new Configuration();
105    String clusterId = "foo";
106    String scope = "testScope";
107    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
108
109    AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
110    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
111    assertTrue("Metrics should be present", metrics.isPresent());
112    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()),
113      metrics.get().getMetricScope());
114    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
115    impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
116
117    metrics = impl.getConnectionMetrics();
118    assertTrue("Metrics should be present", metrics.isPresent());
119    assertEquals(scope, metrics.get().getMetricScope());
120  }
121
122  @Test
123  public void testMetricsWithMultiConnections() throws IOException {
124    Configuration conf = new Configuration();
125    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
126    conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
127
128    User user = User.getCurrent();
129
130    /* create multiple connections */
131    final int num = 3;
132    AsyncConnectionImpl impl;
133    List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
134    for (int i = 0; i < num; i++) {
135      impl = new AsyncConnectionImpl(conf, null, null, null, user);
136      connList.add(impl);
137    }
138
139    /* verify metrics presence */
140    impl = connList.get(0);
141    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
142    assertTrue("Metrics should be present", metrics.isPresent());
143
144    /* verify connection count in a shared metrics */
145    long count = metrics.get().getConnectionCount();
146    assertEquals("Failed to verify connection count." + count, count, num);
147
148    /* close some connections */
149    for (int i = 0; i < num - 1; i++) {
150      connList.get(i).close();
151    }
152
153    /* verify metrics presence again */
154    impl = connList.get(num - 1);
155    metrics = impl.getConnectionMetrics();
156    assertTrue("Metrics should be present after some of connections are closed.",
157      metrics.isPresent());
158
159    /* verify count of remaining connections */
160    count = metrics.get().getConnectionCount();
161    assertEquals("Connection count suppose to be 1 but got: " + count, count, 1);
162
163    /* shutdown */
164    impl.close();
165  }
166
167  @Test
168  public void testStaticMetrics() throws IOException {
169    final byte[] foo = Bytes.toBytes("foo");
170    String table = "TableX";
171    final RegionSpecifier region = RegionSpecifier.newBuilder()
172      .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build();
173    final int loop = 5;
174
175    for (int i = 0; i < loop; i++) {
176      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
177        TableName.valueOf(table),
178        GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(),
179        MetricsConnection.newCallStats(), null);
180      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
181        TableName.valueOf(table),
182        ScanRequest.newBuilder().setRegion(region)
183          .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(),
184        MetricsConnection.newCallStats(),
185        new RemoteWithExtrasException("java.io.IOException", null, false, false));
186      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
187        TableName.valueOf(table),
188        MultiRequest.newBuilder()
189          .addRegionAction(ClientProtos.RegionAction.newBuilder()
190            .addAction(
191              ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build())
192            .setRegion(region).build())
193          .build(),
194        MetricsConnection.newCallStats(),
195        new CallTimeoutException("test with CallTimeoutException"));
196      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
197        TableName.valueOf(table),
198        MutateRequest.newBuilder()
199          .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
200          .setRegion(region).build(),
201        MetricsConnection.newCallStats(), null);
202      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
203        TableName.valueOf(table),
204        MutateRequest.newBuilder()
205          .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
206          .setRegion(region).build(),
207        MetricsConnection.newCallStats(), null);
208      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
209        TableName.valueOf(table),
210        MutateRequest.newBuilder()
211          .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
212          .setRegion(region).build(),
213        MetricsConnection.newCallStats(), null);
214      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
215        TableName.valueOf(table),
216        MutateRequest.newBuilder()
217          .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region)
218          .build(),
219        MetricsConnection.newCallStats(),
220        new CallTimeoutException("test with CallTimeoutException"));
221    }
222
223    testRpcCallMetrics(table, loop);
224
225    String metricKey;
226    long metricVal;
227    Counter counter;
228
229    // remote exception
230    metricKey = "rpcRemoteExceptions_IOException";
231    counter = METRICS.getRpcCounters().get(metricKey);
232    metricVal = (counter != null) ? counter.getCount() : 0;
233    assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop);
234
235    // local exception
236    metricKey = "rpcLocalExceptions_CallTimeoutException";
237    counter = METRICS.getRpcCounters().get(metricKey);
238    metricVal = (counter != null) ? counter.getCount() : 0;
239    assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 2);
240
241    // total exception
242    metricKey = "rpcTotalExceptions";
243    counter = METRICS.getRpcCounters().get(metricKey);
244    metricVal = (counter != null) ? counter.getCount() : 0;
245    assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 3);
246
247    testRpcCallTableMetrics(table, loop);
248
249    for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
250      METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
251      METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
252      METRICS.getPutTracker() }) {
253      assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount());
254      assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount());
255      assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount());
256    }
257    RatioGauge executorMetrics =
258      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName());
259    RatioGauge metaMetrics =
260      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName());
261    assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0);
262    assertEquals(Double.NaN, metaMetrics.getValue(), 0);
263  }
264
265  private void testRpcCallTableMetrics(String table, int expectedVal) {
266    String metricKey;
267    Timer timer;
268    String numOpsSuffix = "_num_ops";
269    String p95Suffix = "_95th_percentile";
270    String p99Suffix = "_99th_percentile";
271    String service = ClientService.getDescriptor().getName();
272    for (String m : new String[] { "Get", "Scan", "Multi" }) {
273      metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table;
274      timer = METRICS.getRpcTimers().get(metricKey);
275      if (tableMetricsEnabled) {
276        long numOps = timer.getCount();
277        double p95 = timer.getSnapshot().get95thPercentile();
278        double p99 = timer.getSnapshot().get99thPercentile();
279        assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal,
280          numOps);
281        assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
282        assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
283      } else {
284        assertNull(timer);
285      }
286    }
287
288    // Distinguish mutate types for mutate method.
289    String mutateMethod = "Mutate";
290    for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
291      metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")"
292        + "_" + table;
293      timer = METRICS.getRpcTimers().get(metricKey);
294      if (tableMetricsEnabled) {
295        long numOps = timer.getCount();
296        double p95 = timer.getSnapshot().get95thPercentile();
297        double p99 = timer.getSnapshot().get99thPercentile();
298        assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal,
299          numOps);
300        assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
301        assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
302      } else {
303        assertNull(timer);
304      }
305    }
306  }
307
308  private void testRpcCallMetrics(String table, int expectedVal) {
309    final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_";
310    final String rpcFailureCountPrefix =
311      "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_";
312    String metricKey;
313    long metricVal;
314    Counter counter;
315
316    for (String method : new String[] { "Get", "Scan", "Multi" }) {
317      // rpc call count
318      metricKey = rpcCountPrefix + method;
319      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
320      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
321
322      // rpc failure call
323      metricKey = tableMetricsEnabled
324        ? rpcFailureCountPrefix + method + "_" + table
325        : rpcFailureCountPrefix + method;
326      counter = METRICS.getRpcCounters().get(metricKey);
327      metricVal = (counter != null) ? counter.getCount() : 0;
328      if (method.equals("Get")) {
329        // no failure
330        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal);
331      } else {
332        // has failure
333        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
334      }
335    }
336
337    String method = "Mutate";
338    for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
339      // rpc call count
340      metricKey = rpcCountPrefix + method + "(" + mutationType + ")";
341      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
342      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
343
344      // rpc failure call
345      metricKey = tableMetricsEnabled
346        ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table
347        : rpcFailureCountPrefix + method + "(" + mutationType + ")";
348      counter = METRICS.getRpcCounters().get(metricKey);
349      metricVal = (counter != null) ? counter.getCount() : 0;
350      if (mutationType.equals("Put")) {
351        // has failure
352        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
353      } else {
354        // no failure
355        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal);
356      }
357    }
358  }
359}