001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasException; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.startsWith; 028 029import io.opentelemetry.api.trace.StatusCode; 030import io.opentelemetry.sdk.trace.data.SpanData; 031import java.util.ArrayList; 032import java.util.List; 033import java.util.function.Supplier; 034import java.util.stream.Collectors; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; 037import org.apache.hadoop.hbase.testclassification.ClientTests; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.hamcrest.Matcher; 040import org.junit.ClassRule; 041import org.junit.experimental.categories.Category; 042import org.junit.runner.RunWith; 043import org.junit.runners.Parameterized; 044import org.junit.runners.Parameterized.Parameter; 045import org.junit.runners.Parameterized.Parameters; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049@RunWith(Parameterized.class) 050@Category({ LargeTests.class, ClientTests.class }) 051public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { 052 private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class); 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestRawAsyncTableScan.class); 057 058 @Parameter(0) 059 public String scanType; 060 061 @Parameter(1) 062 public Supplier<Scan> scanCreater; 063 064 @Parameters(name = "{index}: type={0}") 065 public static List<Object[]> params() { 066 return getScanCreatorParams(); 067 } 068 069 @Override 070 protected Scan createScan() { 071 return scanCreater.get(); 072 } 073 074 @Override 075 protected List<Result> doScan(Scan scan, int closeAfter) throws Exception { 076 TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer(); 077 CONN_RULE.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer); 078 List<Result> results = new ArrayList<>(); 079 // these tests batch settings with the sample data result in each result being 080 // split in two. so we must allow twice the expected results in order to reach 081 // our true limit. see convertFromBatchResult for details. 082 if (closeAfter > 0 && scan.getBatch() > 0) { 083 closeAfter = closeAfter * 2; 084 } 085 for (Result result; (result = scanConsumer.take()) != null;) { 086 results.add(result); 087 if (closeAfter > 0 && results.size() >= closeAfter) { 088 break; 089 } 090 } 091 if (scan.getBatch() > 0) { 092 results = convertFromBatchResult(results); 093 } 094 return results; 095 } 096 097 @Override 098 protected void assertTraceContinuity() { 099 final String parentSpanName = testName.getMethodName(); 100 final Matcher<SpanData> parentSpanMatcher = 101 allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); 102 waitForSpan(parentSpanMatcher); 103 104 if (logger.isDebugEnabled()) { 105 StringTraceRenderer stringTraceRenderer = 106 new StringTraceRenderer(spanStream().collect(Collectors.toList())); 107 stringTraceRenderer.render(logger::debug); 108 } 109 110 final String parentSpanId = 111 spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); 112 113 final Matcher<SpanData> scanOperationSpanMatcher = 114 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 115 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); 116 waitForSpan(scanOperationSpanMatcher); 117 118 final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches) 119 .map(SpanData::getSpanId).findAny().get(); 120 final Matcher<SpanData> onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext"); 121 waitForSpan(onNextMatcher); 122 spanStream().filter(onNextMatcher::matches) 123 .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId))); 124 waitForSpan(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId), 125 hasStatusWithCode(StatusCode.OK), hasEnded())); 126 127 final Matcher<SpanData> onCompleteMatcher = 128 hasName("TracedAdvancedScanResultConsumer#onComplete"); 129 waitForSpan(onCompleteMatcher); 130 spanStream().filter(onCompleteMatcher::matches) 131 .forEach(span -> assertThat(span, allOf(onCompleteMatcher, 132 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); 133 } 134 135 @Override 136 protected void 137 assertTraceError(Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher) { 138 final String parentSpanName = testName.getMethodName(); 139 final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); 140 waitForSpan(parentSpanMatcher); 141 142 if (logger.isDebugEnabled()) { 143 StringTraceRenderer stringTraceRenderer = 144 new StringTraceRenderer(spanStream().collect(Collectors.toList())); 145 stringTraceRenderer.render(logger::debug); 146 } 147 148 final String parentSpanId = 149 spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); 150 151 final Matcher<SpanData> scanOperationSpanMatcher = 152 allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), 153 hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), 154 hasException(exceptionMatcher), hasEnded()); 155 waitForSpan(scanOperationSpanMatcher); 156 final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches) 157 .map(SpanData::getSpanId).findAny().get(); 158 159 final Matcher<SpanData> onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError"); 160 waitForSpan(onCompleteMatcher); 161 spanStream().filter(onCompleteMatcher::matches) 162 .forEach(span -> assertThat(span, allOf(onCompleteMatcher, 163 hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); 164 } 165}