001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; 027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; 028import static org.hamcrest.MatcherAssert.assertThat; 029import static org.hamcrest.Matchers.allOf; 030import static org.hamcrest.Matchers.containsString; 031import static org.hamcrest.Matchers.hasItem; 032import static org.hamcrest.Matchers.hasSize; 033import static org.mockito.ArgumentMatchers.any; 034import static org.mockito.ArgumentMatchers.anyBoolean; 035import static org.mockito.ArgumentMatchers.anyInt; 036import static org.mockito.Mockito.doAnswer; 037import static org.mockito.Mockito.doNothing; 038import static org.mockito.Mockito.doReturn; 039import static org.mockito.Mockito.mock; 040import static org.mockito.Mockito.spy; 041 042import io.opentelemetry.api.trace.SpanKind; 043import io.opentelemetry.api.trace.StatusCode; 044import io.opentelemetry.sdk.trace.data.SpanData; 045import java.io.IOException; 046import java.util.Arrays; 047import java.util.Collections; 048import java.util.List; 049import java.util.concurrent.ForkJoinPool; 050import java.util.concurrent.atomic.AtomicInteger; 051import java.util.stream.Collectors; 052import org.apache.hadoop.hbase.Cell; 053import org.apache.hadoop.hbase.CellBuilderFactory; 054import org.apache.hadoop.hbase.CellBuilderType; 055import org.apache.hadoop.hbase.HBaseClassTestRule; 056import org.apache.hadoop.hbase.HRegionLocation; 057import org.apache.hadoop.hbase.MatcherPredicate; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.Waiter; 061import org.apache.hadoop.hbase.ipc.HBaseRpcController; 062import org.apache.hadoop.hbase.security.UserProvider; 063import org.apache.hadoop.hbase.testclassification.ClientTests; 064import org.apache.hadoop.hbase.testclassification.MediumTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.hamcrest.Matcher; 067import org.hamcrest.core.IsAnything; 068import org.junit.After; 069import org.junit.Before; 070import org.junit.ClassRule; 071import org.junit.Test; 072import org.junit.experimental.categories.Category; 073import org.mockito.invocation.InvocationOnMock; 074import org.mockito.stubbing.Answer; 075 076import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 077import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 078 079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 094 095@Category({ ClientTests.class, MediumTests.class }) 096public class TestHTableTracing extends TestTracingBase { 097 @ClassRule 098 public static final HBaseClassTestRule CLASS_RULE = 099 HBaseClassTestRule.forClass(TestHTableTracing.class); 100 101 private ClientProtos.ClientService.BlockingInterface stub; 102 private ConnectionImplementation conn; 103 private Table table; 104 105 @Override 106 @Before 107 public void setUp() throws Exception { 108 super.setUp(); 109 110 stub = mock(ClientService.BlockingInterface.class); 111 112 AtomicInteger scanNextCalled = new AtomicInteger(0); 113 114 doAnswer(new Answer<ScanResponse>() { 115 @Override 116 public ScanResponse answer(InvocationOnMock invocation) throws Throwable { 117 ScanRequest req = invocation.getArgument(1); 118 if (!req.hasScannerId()) { 119 return ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true) 120 .setMoreResults(true).build(); 121 } else { 122 if (req.hasCloseScanner() && req.getCloseScanner()) { 123 return ScanResponse.getDefaultInstance(); 124 } else { 125 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 126 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 127 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 128 .setValue(Bytes.toBytes("v")).build(); 129 Result result = Result.create(Arrays.asList(cell)); 130 ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) 131 .addResults(ProtobufUtil.toResult(result)); 132 if (req.getLimitOfRows() == 1) { 133 builder.setMoreResultsInRegion(false).setMoreResults(false); 134 } else { 135 builder.setMoreResultsInRegion(true).setMoreResults(true); 136 } 137 return builder.build(); 138 } 139 } 140 } 141 }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class)); 142 143 doAnswer(new Answer<MultiResponse>() { 144 @Override 145 public MultiResponse answer(InvocationOnMock invocation) throws Throwable { 146 MultiResponse resp = 147 MultiResponse.newBuilder() 148 .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( 149 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) 150 .build(); 151 return resp; 152 } 153 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class)); 154 155 doAnswer(new Answer<MutateResponse>() { 156 @Override 157 public MutateResponse answer(InvocationOnMock invocation) throws Throwable { 158 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 159 MutateResponse resp; 160 switch (req.getMutateType()) { 161 case INCREMENT: 162 ColumnValue value = req.getColumnValue(0); 163 QualifierValue qvalue = value.getQualifierValue(0); 164 Cell cell = 165 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 166 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 167 .setQualifier(qvalue.getQualifier().toByteArray()) 168 .setValue(qvalue.getValue().toByteArray()).build(); 169 resp = MutateResponse.newBuilder() 170 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 171 break; 172 default: 173 resp = MutateResponse.getDefaultInstance(); 174 break; 175 } 176 return resp; 177 } 178 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class)); 179 180 doAnswer(new Answer<GetResponse>() { 181 @Override 182 public GetResponse answer(InvocationOnMock invocation) throws Throwable { 183 ClientProtos.Get req = ((GetRequest) invocation.getArgument(1)).getGet(); 184 ColumnValue value = ColumnValue.getDefaultInstance(); 185 QualifierValue qvalue = QualifierValue.getDefaultInstance(); 186 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 187 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 188 .setQualifier(qvalue.getQualifier().toByteArray()) 189 .setValue(qvalue.getValue().toByteArray()).build(); 190 return GetResponse.newBuilder() 191 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell), true))).build(); 192 } 193 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class)); 194 195 conn = spy(new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent(), 196 Collections.emptyMap()) { 197 @Override 198 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 199 RegionLocator locator = mock(HRegionLocator.class); 200 Answer<HRegionLocation> answer = new Answer<HRegionLocation>() { 201 202 @Override 203 public HRegionLocation answer(InvocationOnMock invocation) throws Throwable { 204 TableName tableName = TableName.META_TABLE_NAME; 205 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 206 ServerName serverName = MASTER_HOST; 207 HRegionLocation loc = new HRegionLocation(info, serverName); 208 return loc; 209 } 210 }; 211 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt(), anyBoolean()); 212 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class)); 213 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyInt()); 214 doAnswer(answer).when(locator).getRegionLocation(any(byte[].class), anyBoolean()); 215 return locator; 216 } 217 218 @Override 219 public ClientService.BlockingInterface getClient(ServerName serverName) throws IOException { 220 return stub; 221 } 222 }); 223 // this setup of AsyncProcess is for MultiResponse 224 AsyncProcess asyncProcess = mock(AsyncProcess.class); 225 AsyncRequestFuture asyncRequestFuture = mock(AsyncRequestFuture.class); 226 doNothing().when(asyncRequestFuture).waitUntilDone(); 227 doReturn(asyncRequestFuture).when(asyncProcess).submit(any()); 228 doReturn(asyncProcess).when(conn).getAsyncProcess(); 229 // setup the table instance 230 table = conn.getTable(TableName.META_TABLE_NAME, ForkJoinPool.commonPool()); 231 } 232 233 @After 234 public void tearDown() throws IOException { 235 Closeables.close(conn, true); 236 } 237 238 private void assertTrace(String tableOperation) { 239 assertTrace(tableOperation, new IsAnything<>()); 240 } 241 242 private void assertTrace(String tableOperation, Matcher<SpanData> matcher) { 243 // n.b. this method implementation must match the one of the same name found in 244 // TestAsyncTableTracing 245 final TableName tableName = table.getName(); 246 final Matcher<SpanData> spanLocator = 247 allOf(hasName(containsString(tableOperation)), hasEnded()); 248 final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); 249 250 Waiter.waitFor(conf, 1000, new MatcherPredicate<>("waiting for span to emit", 251 () -> TRACE_RULE.getSpans(), hasItem(spanLocator))); 252 List<SpanData> candidateSpans = 253 TRACE_RULE.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList()); 254 assertThat(candidateSpans, hasSize(1)); 255 SpanData data = candidateSpans.iterator().next(); 256 assertThat(data, 257 allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), 258 buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher)); 259 } 260 261 @Test 262 public void testPut() throws IOException { 263 table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 264 Bytes.toBytes("v"))); 265 assertTrace("PUT"); 266 } 267 268 @Test 269 public void testExists() throws IOException { 270 table.exists(new Get(Bytes.toBytes(0))); 271 assertTrace("GET"); 272 } 273 274 @Test 275 public void testGet() throws IOException { 276 table.get(new Get(Bytes.toBytes(0))); 277 assertTrace("GET"); 278 } 279 280 @Test 281 public void testDelete() throws IOException { 282 table.delete(new Delete(Bytes.toBytes(0))); 283 assertTrace("DELETE"); 284 } 285 286 @Test 287 public void testAppend() throws IOException { 288 table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 289 Bytes.toBytes("v"))); 290 assertTrace("APPEND"); 291 } 292 293 @Test 294 public void testIncrement() throws IOException { 295 table.increment( 296 new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)); 297 assertTrace("INCREMENT"); 298 } 299 300 @Test 301 public void testIncrementColumnValue1() throws IOException { 302 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1); 303 assertTrace("INCREMENT"); 304 } 305 306 @Test 307 public void testIncrementColumnValue2() throws IOException { 308 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 309 Durability.SYNC_WAL); 310 assertTrace("INCREMENT"); 311 } 312 313 @Test 314 public void testCheckAndMutate() throws IOException { 315 table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 316 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 317 .build(new Delete(Bytes.toBytes(0)))); 318 assertTrace("CHECK_AND_MUTATE", 319 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 320 "CHECK_AND_MUTATE", "DELETE"))); 321 } 322 323 @Test 324 public void testCheckAndMutateList() throws IOException { 325 table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 326 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 327 .build(new Delete(Bytes.toBytes(0))))); 328 assertTrace("BATCH", 329 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 330 "CHECK_AND_MUTATE", "DELETE"))); 331 } 332 333 @Test 334 public void testCheckAndMutateAll() throws IOException { 335 table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 336 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 337 .build(new Delete(Bytes.toBytes(0))))); 338 assertTrace("BATCH", 339 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 340 "CHECK_AND_MUTATE", "DELETE"))); 341 } 342 343 @Test 344 public void testMutateRow() throws Exception { 345 byte[] row = Bytes.toBytes(0); 346 table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row)))); 347 assertTrace("BATCH", 348 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 349 } 350 351 @Test 352 public void testExistsList() throws IOException { 353 table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))); 354 assertTrace("BATCH", 355 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 356 } 357 358 @Test 359 public void testExistsAll() throws IOException { 360 table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))); 361 assertTrace("BATCH", 362 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 363 } 364 365 @Test 366 public void testGetList() throws IOException { 367 table.get(Arrays.asList(new Get(Bytes.toBytes(0)))); 368 assertTrace("BATCH", 369 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 370 } 371 372 @Test 373 public void testPutList() throws IOException { 374 table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 375 Bytes.toBytes("cq"), Bytes.toBytes("v")))); 376 assertTrace("BATCH", 377 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 378 } 379 380 @Test 381 public void testDeleteList() throws IOException { 382 table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0)))); 383 assertTrace("BATCH", 384 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 385 } 386 387 @Test 388 public void testBatchList() throws IOException, InterruptedException { 389 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null); 390 assertTrace("BATCH", 391 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 392 } 393 394 @Test 395 public void testTableClose() throws IOException { 396 table.close(); 397 assertTrace(HTable.class.getSimpleName(), "close", null, TableName.META_TABLE_NAME); 398 } 399}