001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import com.codahale.metrics.Counter; 025import com.codahale.metrics.RatioGauge; 026import com.codahale.metrics.RatioGauge.Ratio; 027import com.codahale.metrics.Timer; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.List; 032import java.util.Optional; 033import java.util.concurrent.Executors; 034import java.util.concurrent.ThreadPoolExecutor; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.ipc.CallTimeoutException; 039import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MetricsTests; 043import org.apache.hadoop.hbase.testclassification.SmallTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.After; 046import org.junit.Before; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.junit.runner.RunWith; 051import org.junit.runners.Parameterized; 052import org.junit.runners.Parameterized.Parameter; 053import org.junit.runners.Parameterized.Parameters; 054 055import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 056 057import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 067 068@RunWith(Parameterized.class) 069@Category({ ClientTests.class, MetricsTests.class, SmallTests.class }) 070public class TestMetricsConnection { 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestMetricsConnection.class); 074 075 private static final Configuration conf = new Configuration(); 076 private static MetricsConnection METRICS; 077 private static final ThreadPoolExecutor BATCH_POOL = 078 (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 079 080 private static final String MOCK_CONN_STR = "mocked-connection"; 081 082 @Parameter() 083 public boolean tableMetricsEnabled; 084 085 @Parameters 086 public static List<Boolean> params() { 087 return Arrays.asList(false, true); 088 } 089 090 @Before 091 public void before() { 092 conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled); 093 METRICS = 094 MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null); 095 } 096 097 @After 098 public void after() { 099 MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR); 100 } 101 102 @Test 103 public void testMetricsConnectionScope() throws IOException { 104 Configuration conf = new Configuration(); 105 String clusterId = "foo"; 106 String scope = "testScope"; 107 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 108 109 AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent()); 110 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 111 assertTrue("Metrics should be present", metrics.isPresent()); 112 assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), 113 metrics.get().getMetricScope()); 114 conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope); 115 impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent()); 116 117 metrics = impl.getConnectionMetrics(); 118 assertTrue("Metrics should be present", metrics.isPresent()); 119 assertEquals(scope, metrics.get().getMetricScope()); 120 } 121 122 @Test 123 public void testMetricsWithMultiConnections() throws IOException { 124 Configuration conf = new Configuration(); 125 conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true); 126 conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test"); 127 128 User user = User.getCurrent(); 129 130 /* create multiple connections */ 131 final int num = 3; 132 AsyncConnectionImpl impl; 133 List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>(); 134 for (int i = 0; i < num; i++) { 135 impl = new AsyncConnectionImpl(conf, null, null, null, user); 136 connList.add(impl); 137 } 138 139 /* verify metrics presence */ 140 impl = connList.get(0); 141 Optional<MetricsConnection> metrics = impl.getConnectionMetrics(); 142 assertTrue("Metrics should be present", metrics.isPresent()); 143 144 /* verify connection count in a shared metrics */ 145 long count = metrics.get().getConnectionCount(); 146 assertEquals("Failed to verify connection count." + count, count, num); 147 148 /* close some connections */ 149 for (int i = 0; i < num - 1; i++) { 150 connList.get(i).close(); 151 } 152 153 /* verify metrics presence again */ 154 impl = connList.get(num - 1); 155 metrics = impl.getConnectionMetrics(); 156 assertTrue("Metrics should be present after some of connections are closed.", 157 metrics.isPresent()); 158 159 /* verify count of remaining connections */ 160 count = metrics.get().getConnectionCount(); 161 assertEquals("Connection count suppose to be 1 but got: " + count, count, 1); 162 163 /* shutdown */ 164 impl.close(); 165 } 166 167 @Test 168 public void testStaticMetrics() throws IOException { 169 final byte[] foo = Bytes.toBytes("foo"); 170 String table = "TableX"; 171 final RegionSpecifier region = RegionSpecifier.newBuilder() 172 .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build(); 173 final int loop = 5; 174 175 for (int i = 0; i < loop; i++) { 176 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"), 177 TableName.valueOf(table), 178 GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(), 179 MetricsConnection.newCallStats(), null); 180 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"), 181 TableName.valueOf(table), 182 ScanRequest.newBuilder().setRegion(region) 183 .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(), 184 MetricsConnection.newCallStats(), 185 new RemoteWithExtrasException("java.io.IOException", null, false, false)); 186 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"), 187 TableName.valueOf(table), 188 MultiRequest.newBuilder() 189 .addRegionAction(ClientProtos.RegionAction.newBuilder() 190 .addAction( 191 ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build()) 192 .setRegion(region).build()) 193 .build(), 194 MetricsConnection.newCallStats(), 195 new CallTimeoutException("test with CallTimeoutException")); 196 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 197 TableName.valueOf(table), 198 MutateRequest.newBuilder() 199 .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo))) 200 .setRegion(region).build(), 201 MetricsConnection.newCallStats(), null); 202 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 203 TableName.valueOf(table), 204 MutateRequest.newBuilder() 205 .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo))) 206 .setRegion(region).build(), 207 MetricsConnection.newCallStats(), null); 208 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 209 TableName.valueOf(table), 210 MutateRequest.newBuilder() 211 .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo))) 212 .setRegion(region).build(), 213 MetricsConnection.newCallStats(), null); 214 METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"), 215 TableName.valueOf(table), 216 MutateRequest.newBuilder() 217 .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region) 218 .build(), 219 MetricsConnection.newCallStats(), 220 new CallTimeoutException("test with CallTimeoutException")); 221 } 222 223 testRpcCallMetrics(table, loop); 224 225 String metricKey; 226 long metricVal; 227 Counter counter; 228 229 // remote exception 230 metricKey = "rpcRemoteExceptions_IOException"; 231 counter = METRICS.getRpcCounters().get(metricKey); 232 metricVal = (counter != null) ? counter.getCount() : 0; 233 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop); 234 235 // local exception 236 metricKey = "rpcLocalExceptions_CallTimeoutException"; 237 counter = METRICS.getRpcCounters().get(metricKey); 238 metricVal = (counter != null) ? counter.getCount() : 0; 239 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 2); 240 241 // total exception 242 metricKey = "rpcTotalExceptions"; 243 counter = METRICS.getRpcCounters().get(metricKey); 244 metricVal = (counter != null) ? counter.getCount() : 0; 245 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 3); 246 247 testRpcCallTableMetrics(table, loop); 248 249 for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { 250 METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(), 251 METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(), 252 METRICS.getPutTracker() }) { 253 assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount()); 254 assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount()); 255 assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount()); 256 } 257 RatioGauge executorMetrics = 258 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName()); 259 RatioGauge metaMetrics = 260 (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName()); 261 assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0); 262 assertEquals(Double.NaN, metaMetrics.getValue(), 0); 263 } 264 265 private void testRpcCallTableMetrics(String table, int expectedVal) { 266 String metricKey; 267 Timer timer; 268 String numOpsSuffix = "_num_ops"; 269 String p95Suffix = "_95th_percentile"; 270 String p99Suffix = "_99th_percentile"; 271 String service = ClientService.getDescriptor().getName(); 272 for (String m : new String[] { "Get", "Scan", "Multi" }) { 273 metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table; 274 timer = METRICS.getRpcTimers().get(metricKey); 275 if (tableMetricsEnabled) { 276 long numOps = timer.getCount(); 277 double p95 = timer.getSnapshot().get95thPercentile(); 278 double p99 = timer.getSnapshot().get99thPercentile(); 279 assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, 280 numOps); 281 assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); 282 assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); 283 } else { 284 assertNull(timer); 285 } 286 } 287 288 // Distinguish mutate types for mutate method. 289 String mutateMethod = "Mutate"; 290 for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { 291 metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")" 292 + "_" + table; 293 timer = METRICS.getRpcTimers().get(metricKey); 294 if (tableMetricsEnabled) { 295 long numOps = timer.getCount(); 296 double p95 = timer.getSnapshot().get95thPercentile(); 297 double p99 = timer.getSnapshot().get99thPercentile(); 298 assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal, 299 numOps); 300 assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0); 301 assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0); 302 } else { 303 assertNull(timer); 304 } 305 } 306 } 307 308 private void testRpcCallMetrics(String table, int expectedVal) { 309 final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_"; 310 final String rpcFailureCountPrefix = 311 "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_"; 312 String metricKey; 313 long metricVal; 314 Counter counter; 315 316 for (String method : new String[] { "Get", "Scan", "Multi" }) { 317 // rpc call count 318 metricKey = rpcCountPrefix + method; 319 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 320 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 321 322 // rpc failure call 323 metricKey = tableMetricsEnabled 324 ? rpcFailureCountPrefix + method + "_" + table 325 : rpcFailureCountPrefix + method; 326 counter = METRICS.getRpcCounters().get(metricKey); 327 metricVal = (counter != null) ? counter.getCount() : 0; 328 if (method.equals("Get")) { 329 // no failure 330 assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); 331 } else { 332 // has failure 333 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 334 } 335 } 336 337 String method = "Mutate"; 338 for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) { 339 // rpc call count 340 metricKey = rpcCountPrefix + method + "(" + mutationType + ")"; 341 metricVal = METRICS.getRpcCounters().get(metricKey).getCount(); 342 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 343 344 // rpc failure call 345 metricKey = tableMetricsEnabled 346 ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table 347 : rpcFailureCountPrefix + method + "(" + mutationType + ")"; 348 counter = METRICS.getRpcCounters().get(metricKey); 349 metricVal = (counter != null) ? counter.getCount() : 0; 350 if (mutationType.equals("Put")) { 351 // has failure 352 assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal); 353 } else { 354 // no failure 355 assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal); 356 } 357 } 358 } 359}