001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.hasItem;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026
027import io.opentelemetry.api.trace.SpanKind;
028import io.opentelemetry.api.trace.StatusCode;
029import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
030import io.opentelemetry.sdk.trace.data.SpanData;
031import java.util.Objects;
032import java.util.Optional;
033import java.util.concurrent.CompletableFuture;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.MatcherPredicate;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.Waiter;
043import org.apache.hadoop.hbase.security.User;
044import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
045import org.hamcrest.Matcher;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051public class TestTracingBase {
052  private static final Logger LOG = LoggerFactory.getLogger(TestTracingBase.class);
053
054  protected static final ServerName MASTER_HOST = ServerName.valueOf("localhost", 16010, 12345);
055  protected static final RegionLocations META_REGION_LOCATION =
056    new RegionLocations(new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, MASTER_HOST));
057
058  protected Configuration conf;
059
060  @ClassRule
061  public static OpenTelemetryRule TRACE_RULE = OpenTelemetryRule.create();
062
063  @Before
064  public void setUp() throws Exception {
065    conf = HBaseConfiguration.create();
066    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
067      RegistryForTracingTest.class.getName());
068    TRACE_RULE.clearSpans();
069  }
070
071  protected void assertTrace(String className, String methodName, ServerName serverName,
072    TableName tableName) {
073    String expectedSpanName = String.format("%s.%s", className, methodName);
074    Waiter.waitFor(conf, 1000,
075      () -> TRACE_RULE.getSpans().stream().anyMatch(span -> span.getName().equals(expectedSpanName)
076        && span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
077    SpanData data = TRACE_RULE.getSpans().stream().filter(s -> s.getName().equals(expectedSpanName))
078      .findFirst().get();
079    assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
080
081    if (serverName != null) {
082      Optional<SpanData> foundServerName =
083        TRACE_RULE.getSpans().stream().filter(s -> s.getName().equals(expectedSpanName))
084          .filter(s -> Objects.equals(serverName.getServerName(),
085            s.getAttributes().get(HBaseSemanticAttributes.SERVER_NAME_KEY)))
086          .findAny();
087      assertTrue(foundServerName.isPresent());
088    }
089
090    if (tableName != null) {
091      assertEquals(tableName.getNamespaceAsString(),
092        data.getAttributes().get(HBaseSemanticAttributes.DB_NAME));
093      assertEquals(tableName.getNameAsString(),
094        data.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
095    }
096  }
097
098  protected SpanData waitSpan(String name) {
099    return waitSpan(hasName(name));
100  }
101
102  protected SpanData waitSpan(Matcher<SpanData> matcher) {
103    Matcher<SpanData> spanLocator = allOf(matcher, hasEnded());
104    try {
105      Waiter.waitFor(conf, 1000, new MatcherPredicate<>("waiting for span",
106        () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
107    } catch (AssertionError e) {
108      LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}",
109        TRACE_RULE.getSpans());
110      throw e;
111    }
112    return TRACE_RULE.getSpans().stream().filter(spanLocator::matches).findFirst()
113      .orElseThrow(AssertionError::new);
114  }
115
116  static class RegistryForTracingTest implements ConnectionRegistry {
117
118    public RegistryForTracingTest(Configuration conf, User user) {
119    }
120
121    @Override
122    public CompletableFuture<RegionLocations> getMetaRegionLocations() {
123      return CompletableFuture.completedFuture(META_REGION_LOCATION);
124    }
125
126    @Override
127    public CompletableFuture<String> getClusterId() {
128      return CompletableFuture.completedFuture(HConstants.CLUSTER_ID_DEFAULT);
129    }
130
131    @Override
132    public CompletableFuture<ServerName> getActiveMaster() {
133      return CompletableFuture.completedFuture(MASTER_HOST);
134    }
135
136    @Override
137    public String getConnectionString() {
138      return "nothing";
139    }
140
141    @Override
142    public void close() {
143
144    }
145  }
146
147}