001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertNull; 023import static org.junit.Assert.assertTrue; 024 025import com.codahale.metrics.Counter; 026import com.codahale.metrics.RatioGauge; 027import com.codahale.metrics.RatioGauge.Ratio; 028import com.codahale.metrics.Timer; 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.Optional; 034import java.util.concurrent.CompletableFuture; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadPoolExecutor; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.ipc.CallTimeoutException; 041import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.MetricsTests; 045import org.apache.hadoop.hbase.testclassification.SmallTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.After; 048import org.junit.Before; 049import org.junit.ClassRule; 050import org.junit.Test; 051import org.junit.experimental.categories.Category; 052import org.junit.runner.RunWith; 053import org.junit.runners.Parameterized; 054import org.junit.runners.Parameterized.Parameter; 055import org.junit.runners.Parameterized.Parameters; 056import org.mockito.Mockito; 057 058import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 070 071@RunWith(Parameterized.class) 072@Category({ ClientTests.class, MetricsTests.class, SmallTests.class }) 073public class TestMetricsConnection { 074 @ClassRule 075 public static final HBaseClassTestRule CLASS_RULE = 076 HBaseClassTestRule.forClass(TestMetricsConnection.class); 077 078 private static final Configuration conf = new Configuration(); 079 private static MetricsConnection METRICS; 080 private static final ThreadPoolExecutor BATCH_POOL = 081 (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 082 083 private static final String MOCK_CONN_STR = "mocked-connection"; 084 085 @Parameter() 086 public boolean tableMetricsEnabled; 087 088 @Parameters 089 public static List<Boolean> params() { 090 return Arrays.asList(false, true); 091 } 092 093 @Before 094 public void before() { 095 conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled); 096 METRICS = 097 MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null); 098 } 099 100 @After 101 public void after() { 102 MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR); 103 } 104 105 @Test 106 public void testMetricsConnectionScopeAsyncClient() throws IOException { 107 Configuration conf = new Configuration(); 108 String clusterId = "foo"; 109 String scope = "testScope"; 110 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 111 112 AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent()); 113 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 114 assertTrue("Metrics should be present", metrics.isPresent()); 115 assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), 116 metrics.get().getMetricScope()); 117 conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); 118 impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent()); 119 120 metrics = impl.getConnectionMetrics(); 121 assertTrue("Metrics should be present", metrics.isPresent()); 122 assertEquals(scope, metrics.get().getMetricScope()); 123 } 124 125 @Test 126 public void testMetricsWithMultiConnections() throws IOException { 127 Configuration conf = new Configuration(); 128 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 129 conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test"); 130 131 User user = User.getCurrent(); 132 133 /* create multiple connections */ 134 final int num = 3; 135 AsyncConnectionImpl impl; 136 List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>(); 137 for (int i = 0; i < num; i++) { 138 impl = new AsyncConnectionImpl(conf, null, null, user); 139 connList.add(impl); 140 } 141 142 /* verify metrics presence */ 143 impl = connList.get(0); 144 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 145 assertTrue("Metrics should be present", metrics.isPresent()); 146 147 /* verify connection count in a shared metrics */ 148 long count = metrics.get().getConnectionCount(); 149 assertEquals("Failed to verify connection count." + count, count, num); 150 151 /* close some connections */ 152 for (int i = 0; i < num - 1; i++) { 153 connList.get(i).close(); 154 } 155 156 /* verify metrics presence again */ 157 impl = connList.get(num - 1); 158 metrics = impl.getConnectionMetrics(); 159 assertTrue("Metrics should be present after some of connections are closed.", 160 metrics.isPresent()); 161 162 /* verify count of remaining connections */ 163 count = metrics.get().getConnectionCount(); 164 assertEquals("Connection count suppose to be 1 but got: " + count, count, 1); 165 166 /* shutdown */ 167 impl.close(); 168 } 169 170 @Test 171 public void testMetricsConnectionScopeBlockingClient() throws IOException { 172 Configuration conf = new Configuration(); 173 String clusterId = "foo"; 174 String scope = "testScope"; 175 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 176 177 ConnectionRegistry mockRegistry = Mockito.mock(ConnectionRegistry.class); 178 Mockito.when(mockRegistry.getClusterId()) 179 .thenReturn(CompletableFuture.completedFuture(clusterId)); 180 181 ConnectionImplementation impl = 182 new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry); 183 MetricsConnection metrics = impl.getConnectionMetrics(); 184 assertNotNull("Metrics should be present", metrics); 185 assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.getMetricScope()); 186 conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); 187 impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry); 188 189 metrics = impl.getConnectionMetrics(); 190 assertNotNull("Metrics should be present", metrics); 191 assertEquals(scope, metrics.getMetricScope()); 192 } 193 194 @Test 195 public void testStaticMetrics() throws IOException { 196 final byte[] foo = Bytes.toBytes("foo"); 197 String table = "TableX"; 198 final RegionSpecifier region = RegionSpecifier.newBuilder() 199 .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build(); 200 final int loop = 5; 201 202 for (int i = 0; i < loop; i++) { 203 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"), 204 TableName.valueOf(table), 205 GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(), 206 MetricsConnection.newCallStats(), null); 207 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"), 208 TableName.valueOf(table), 209 ScanRequest.newBuilder().setRegion(region) 210 .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(), 211 MetricsConnection.newCallStats(), 212 new RemoteWithExtrasException("java.io.IOException", null, false, false)); 213 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"), 214 TableName.valueOf(table), 215 MultiRequest.newBuilder() 216 .addRegionAction(ClientProtos.RegionAction.newBuilder() 217 .addAction( 218 ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build()) 219 .setRegion(region).build()) 220 .build(), 221 MetricsConnection.newCallStats(), 222 new CallTimeoutException("test with CallTimeoutException")); 223 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 224 TableName.valueOf(table), 225 MutateRequest.newBuilder() 226 .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) 227 .setRegion(region).build(), 228 MetricsConnection.newCallStats(), null); 229 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 230 TableName.valueOf(table), 231 MutateRequest.newBuilder() 232 .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) 233 .setRegion(region).build(), 234 MetricsConnection.newCallStats(), null); 235 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 236 TableName.valueOf(table), 237 MutateRequest.newBuilder() 238 .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) 239 .setRegion(region).build(), 240 MetricsConnection.newCallStats(), null); 241 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 242 TableName.valueOf(table), 243 MutateRequest.newBuilder() 244 .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region) 245 .build(), 246 MetricsConnection.newCallStats(), 247 new CallTimeoutException("test with CallTimeoutException")); 248 } 249 250 testRpcCallMetrics(table, loop); 251 252 String metricKey; 253 long metricVal; 254 Counter counter; 255 256 // remote exception 257 metricKey = "rpcRemoteExceptions_IOException"; 258 counter = METRICS.getRpcCounters().get(metricKey); 259 metricVal = (counter != null) ? counter.getCount() : 0; 260 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop); 261 262 // local exception 263 metricKey = "rpcLocalExceptions_CallTimeoutException"; 264 counter = METRICS.getRpcCounters().get(metricKey); 265 metricVal = (counter != null) ? counter.getCount() : 0; 266 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 2); 267 268 // total exception 269 metricKey = "rpcTotalExceptions"; 270 counter = METRICS.getRpcCounters().get(metricKey); 271 metricVal = (counter != null) ? counter.getCount() : 0; 272 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 3); 273 274 testRpcCallTableMetrics(table, loop); 275 276 for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { 277 METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(), 278 METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(), 279 METRICS.getPutTracker() }) { 280 assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount()); 281 assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount()); 282 assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount()); 283 } 284 RatioGauge executorMetrics = 285 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName()); 286 RatioGauge metaMetrics = 287 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName()); 288 assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0); 289 assertEquals(Double.NaN, metaMetrics.getValue(), 0); 290 } 291 292 private void testRpcCallTableMetrics(String table, int expectedVal) { 293 String metricKey; 294 Timer timer; 295 String numOpsSuffix = "_num_ops"; 296 String p95Suffix = "_95th_percentile"; 297 String p99Suffix = "_99th_percentile"; 298 String service = ClientService.getDescriptor().getName(); 299 for (String m : new String[] { "Get", "Scan", "Multi" }) { 300 metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table; 301 timer = METRICS.getRpcTimers().get(metricKey); 302 if (tableMetricsEnabled) { 303 long numOps = timer.getCount(); 304 double p95 = timer.getSnapshot().get95thPercentile(); 305 double p99 = timer.getSnapshot().get99thPercentile(); 306 assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, 307 numOps); 308 assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); 309 assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); 310 } else { 311 assertNull(timer); 312 } 313 } 314 315 // Distinguish mutate types for mutate method. 316 String mutateMethod = "Mutate"; 317 for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { 318 metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")" 319 + "_" + table; 320 timer = METRICS.getRpcTimers().get(metricKey); 321 if (tableMetricsEnabled) { 322 long numOps = timer.getCount(); 323 double p95 = timer.getSnapshot().get95thPercentile(); 324 double p99 = timer.getSnapshot().get99thPercentile(); 325 assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, 326 numOps); 327 assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); 328 assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); 329 } else { 330 assertNull(timer); 331 } 332 } 333 } 334 335 private void testRpcCallMetrics(String table, int expectedVal) { 336 final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_"; 337 final String rpcFailureCountPrefix = 338 "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_"; 339 String metricKey; 340 long metricVal; 341 Counter counter; 342 343 for (String method : new String[] { "Get", "Scan", "Multi" }) { 344 // rpc call count 345 metricKey = rpcCountPrefix + method; 346 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 347 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 348 349 // rpc failure call 350 metricKey = tableMetricsEnabled 351 ? rpcFailureCountPrefix + method + "_" + table 352 : rpcFailureCountPrefix + method; 353 counter = METRICS.getRpcCounters().get(metricKey); 354 metricVal = (counter != null) ? counter.getCount() : 0; 355 if (method.equals("Get")) { 356 // no failure 357 assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); 358 } else { 359 // has failure 360 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 361 } 362 } 363 364 String method = "Mutate"; 365 for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { 366 // rpc call count 367 metricKey = rpcCountPrefix + method + "(" + mutationType + ")"; 368 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 369 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 370 371 // rpc failure call 372 metricKey = tableMetricsEnabled 373 ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table 374 : rpcFailureCountPrefix + method + "(" + mutationType + ")"; 375 counter = METRICS.getRpcCounters().get(metricKey); 376 metricVal = (counter != null) ? counter.getCount() : 0; 377 if (mutationType.equals("Put")) { 378 // has failure 379 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 380 } else { 381 // no failure 382 assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); 383 } 384 } 385 } 386}