001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher; 027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher; 028import static org.hamcrest.MatcherAssert.assertThat; 029import static org.hamcrest.Matchers.allOf; 030import static org.hamcrest.Matchers.containsString; 031import static org.hamcrest.Matchers.greaterThan; 032import static org.hamcrest.Matchers.greaterThanOrEqualTo; 033import static org.hamcrest.Matchers.hasItem; 034import static org.hamcrest.Matchers.hasSize; 035import static org.junit.Assert.fail; 036import static org.mockito.ArgumentMatchers.any; 037import static org.mockito.ArgumentMatchers.anyInt; 038import static org.mockito.ArgumentMatchers.anyLong; 039import static org.mockito.Mockito.doAnswer; 040import static org.mockito.Mockito.mock; 041 042import io.opentelemetry.api.trace.SpanKind; 043import io.opentelemetry.api.trace.StatusCode; 044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 045import io.opentelemetry.sdk.trace.data.SpanData; 046import java.io.IOException; 047import java.util.Arrays; 048import java.util.List; 049import java.util.concurrent.CompletableFuture; 050import java.util.concurrent.CountDownLatch; 051import java.util.concurrent.ForkJoinPool; 052import java.util.concurrent.atomic.AtomicInteger; 053import java.util.concurrent.atomic.AtomicReference; 054import java.util.stream.Collectors; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.hbase.Cell; 057import org.apache.hadoop.hbase.CellBuilderFactory; 058import org.apache.hadoop.hbase.CellBuilderType; 059import org.apache.hadoop.hbase.HBaseClassTestRule; 060import org.apache.hadoop.hbase.HBaseConfiguration; 061import org.apache.hadoop.hbase.HRegionLocation; 062import org.apache.hadoop.hbase.MatcherPredicate; 063import org.apache.hadoop.hbase.ServerName; 064import org.apache.hadoop.hbase.TableName; 065import org.apache.hadoop.hbase.Waiter; 066import org.apache.hadoop.hbase.filter.PrefixFilter; 067import org.apache.hadoop.hbase.ipc.HBaseRpcController; 068import org.apache.hadoop.hbase.security.User; 069import org.apache.hadoop.hbase.security.UserProvider; 070import org.apache.hadoop.hbase.testclassification.ClientTests; 071import org.apache.hadoop.hbase.testclassification.MediumTests; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.hamcrest.Matcher; 074import org.hamcrest.core.IsAnything; 075import org.junit.After; 076import org.junit.Before; 077import org.junit.ClassRule; 078import org.junit.Rule; 079import org.junit.Test; 080import org.junit.experimental.categories.Category; 081import org.mockito.invocation.InvocationOnMock; 082import org.mockito.stubbing.Answer; 083 084import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 085import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 101 102@Category({ ClientTests.class, MediumTests.class }) 103public class TestAsyncTableTracing { 104 105 @ClassRule 106 public static final HBaseClassTestRule CLASS_RULE = 107 HBaseClassTestRule.forClass(TestAsyncTableTracing.class); 108 109 private static Configuration CONF = HBaseConfiguration.create(); 110 111 private ClientService.Interface stub; 112 113 private AsyncConnectionImpl conn; 114 115 private AsyncTable<ScanResultConsumer> table; 116 117 @Rule 118 public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); 119 120 @Before 121 public void setUp() throws IOException { 122 stub = mock(ClientService.Interface.class); 123 AtomicInteger scanNextCalled = new AtomicInteger(0); 124 doAnswer(new Answer<Void>() { 125 126 @Override 127 public Void answer(InvocationOnMock invocation) throws Throwable { 128 ScanRequest req = invocation.getArgument(1); 129 RpcCallback<ScanResponse> done = invocation.getArgument(2); 130 if (!req.hasScannerId()) { 131 done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) 132 .setMoreResultsInRegion(true).setMoreResults(true).build()); 133 } else { 134 if (req.hasCloseScanner() && req.getCloseScanner()) { 135 done.run(ScanResponse.getDefaultInstance()); 136 } else { 137 Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) 138 .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) 139 .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) 140 .setValue(Bytes.toBytes("v")).build(); 141 Result result = Result.create(Arrays.asList(cell)); 142 ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800) 143 .addResults(ProtobufUtil.toResult(result)); 144 if (req.getLimitOfRows() == 1) { 145 builder.setMoreResultsInRegion(false).setMoreResults(false); 146 } else { 147 builder.setMoreResultsInRegion(true).setMoreResults(true); 148 } 149 ForkJoinPool.commonPool().execute(() -> done.run(builder.build())); 150 } 151 } 152 return null; 153 } 154 }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); 155 doAnswer(new Answer<Void>() { 156 157 @Override 158 public Void answer(InvocationOnMock invocation) throws Throwable { 159 ClientProtos.MultiRequest req = invocation.getArgument(1); 160 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); 161 for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { 162 RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder(); 163 for (ClientProtos.Action ignored : regionAction.getActionList()) { 164 raBuilder.addResultOrException( 165 ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))); 166 } 167 builder.addRegionActionResult(raBuilder); 168 } 169 ClientProtos.MultiResponse resp = builder.build(); 170 RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2); 171 ForkJoinPool.commonPool().execute(() -> done.run(resp)); 172 return null; 173 } 174 }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); 175 doAnswer(new Answer<Void>() { 176 177 @Override 178 public Void answer(InvocationOnMock invocation) throws Throwable { 179 MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); 180 MutateResponse resp; 181 switch (req.getMutateType()) { 182 case INCREMENT: 183 ColumnValue value = req.getColumnValue(0); 184 QualifierValue qvalue = value.getQualifierValue(0); 185 Cell cell = 186 CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put) 187 .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) 188 .setQualifier(qvalue.getQualifier().toByteArray()) 189 .setValue(qvalue.getValue().toByteArray()).build(); 190 resp = MutateResponse.newBuilder() 191 .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); 192 break; 193 default: 194 resp = MutateResponse.getDefaultInstance(); 195 break; 196 } 197 RpcCallback<MutateResponse> done = invocation.getArgument(2); 198 ForkJoinPool.commonPool().execute(() -> done.run(resp)); 199 return null; 200 } 201 }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); 202 doAnswer(new Answer<Void>() { 203 204 @Override 205 public Void answer(InvocationOnMock invocation) throws Throwable { 206 RpcCallback<GetResponse> done = invocation.getArgument(2); 207 ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance())); 208 return null; 209 } 210 }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); 211 final User user = UserProvider.instantiate(CONF).getCurrent(); 212 conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null, 213 user) { 214 215 @Override 216 AsyncRegionLocator getLocator() { 217 AsyncRegionLocator locator = mock(AsyncRegionLocator.class); 218 Answer<CompletableFuture<HRegionLocation>> answer = 219 new Answer<CompletableFuture<HRegionLocation>>() { 220 221 @Override 222 public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation) 223 throws Throwable { 224 TableName tableName = invocation.getArgument(0); 225 RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); 226 ServerName serverName = ServerName.valueOf("rs", 16010, 12345); 227 HRegionLocation loc = new HRegionLocation(info, serverName); 228 return CompletableFuture.completedFuture(loc); 229 } 230 }; 231 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 232 any(RegionLocateType.class), anyLong()); 233 doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), 234 anyInt(), any(RegionLocateType.class), anyLong()); 235 return locator; 236 } 237 238 @Override 239 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 240 return stub; 241 } 242 }; 243 table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool()); 244 } 245 246 @After 247 public void tearDown() throws IOException { 248 Closeables.close(conn, true); 249 } 250 251 private void assertTrace(String tableOperation) { 252 assertTrace(tableOperation, new IsAnything<>()); 253 } 254 255 private void assertTrace(String tableOperation, Matcher<SpanData> matcher) { 256 final TableName tableName = table.getName(); 257 final Matcher<SpanData> spanLocator = 258 allOf(hasName(containsString(tableOperation)), hasEnded()); 259 final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); 260 261 Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit", 262 () -> traceRule.getSpans(), hasItem(spanLocator))); 263 List<SpanData> candidateSpans = 264 traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList()); 265 assertThat(candidateSpans, hasSize(1)); 266 SpanData data = candidateSpans.iterator().next(); 267 assertThat(data, 268 allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK), 269 buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher)); 270 } 271 272 @Test 273 public void testExists() { 274 table.exists(new Get(Bytes.toBytes(0))).join(); 275 assertTrace("GET"); 276 } 277 278 @Test 279 public void testGet() { 280 table.get(new Get(Bytes.toBytes(0))).join(); 281 assertTrace("GET"); 282 } 283 284 @Test 285 public void testPut() { 286 table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 287 Bytes.toBytes("v"))).join(); 288 assertTrace("PUT"); 289 } 290 291 @Test 292 public void testDelete() { 293 table.delete(new Delete(Bytes.toBytes(0))).join(); 294 assertTrace("DELETE"); 295 } 296 297 @Test 298 public void testAppend() { 299 table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 300 Bytes.toBytes("v"))).join(); 301 assertTrace("APPEND"); 302 } 303 304 @Test 305 public void testIncrement() { 306 table 307 .increment( 308 new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) 309 .join(); 310 assertTrace("INCREMENT"); 311 } 312 313 @Test 314 public void testIncrementColumnValue1() { 315 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) 316 .join(); 317 assertTrace("INCREMENT"); 318 } 319 320 @Test 321 public void testIncrementColumnValue2() { 322 table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, 323 Durability.ASYNC_WAL).join(); 324 assertTrace("INCREMENT"); 325 } 326 327 @Test 328 public void testCheckAndMutate() { 329 table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 330 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 331 .build(new Delete(Bytes.toBytes(0)))).join(); 332 assertTrace("CHECK_AND_MUTATE"); 333 } 334 335 @Test 336 public void testCheckAndMutateList() { 337 CompletableFuture 338 .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 339 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 340 .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) 341 .join(); 342 assertTrace("BATCH", 343 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 344 "CHECK_AND_MUTATE", "DELETE"))); 345 } 346 347 @Test 348 public void testCheckAndMutateAll() { 349 table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) 350 .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) 351 .build(new Delete(Bytes.toBytes(0))))).join(); 352 assertTrace("BATCH", 353 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", 354 "CHECK_AND_MUTATE", "DELETE"))); 355 } 356 357 private void testCheckAndMutateBuilder(Row op) { 358 AsyncTable.CheckAndMutateBuilder builder = 359 table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) 360 .ifEquals(Bytes.toBytes("v")); 361 if (op instanceof Put) { 362 Put put = (Put) op; 363 builder.thenPut(put).join(); 364 } else if (op instanceof Delete) { 365 Delete delete = (Delete) op; 366 builder.thenDelete(delete).join(); 367 } else if (op instanceof RowMutations) { 368 RowMutations mutations = (RowMutations) op; 369 builder.thenMutate(mutations).join(); 370 } else { 371 fail("unsupported CheckAndPut operation " + op); 372 } 373 assertTrace("CHECK_AND_MUTATE"); 374 } 375 376 @Test 377 public void testCheckAndMutateBuilderThenPut() { 378 Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 379 Bytes.toBytes("v")); 380 testCheckAndMutateBuilder(put); 381 } 382 383 @Test 384 public void testCheckAndMutateBuilderThenDelete() { 385 testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0))); 386 } 387 388 @Test 389 public void testCheckAndMutateBuilderThenMutations() throws IOException { 390 RowMutations mutations = 391 new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), 392 Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0))); 393 testCheckAndMutateBuilder(mutations); 394 } 395 396 private void testCheckAndMutateWithFilterBuilder(Row op) { 397 // use of `PrefixFilter` is completely arbitrary here. 398 AsyncTable.CheckAndMutateWithFilterBuilder builder = 399 table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0))); 400 if (op instanceof Put) { 401 Put put = (Put) op; 402 builder.thenPut(put).join(); 403 } else if (op instanceof Delete) { 404 Delete delete = (Delete) op; 405 builder.thenDelete(delete).join(); 406 } else if (op instanceof RowMutations) { 407 RowMutations mutations = (RowMutations) op; 408 builder.thenMutate(mutations).join(); 409 } else { 410 fail("unsupported CheckAndPut operation " + op); 411 } 412 assertTrace("CHECK_AND_MUTATE"); 413 } 414 415 @Test 416 public void testCheckAndMutateWithFilterBuilderThenPut() { 417 Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), 418 Bytes.toBytes("v")); 419 testCheckAndMutateWithFilterBuilder(put); 420 } 421 422 @Test 423 public void testCheckAndMutateWithFilterBuilderThenDelete() { 424 testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0))); 425 } 426 427 @Test 428 public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException { 429 RowMutations mutations = 430 new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), 431 Bytes.toBytes("cq"), Bytes.toBytes("v"))).add(new Delete(Bytes.toBytes(0))); 432 testCheckAndMutateWithFilterBuilder(mutations); 433 } 434 435 @Test 436 public void testMutateRow() throws IOException { 437 final RowMutations mutations = new RowMutations(Bytes.toBytes(0)).add(new Put(Bytes.toBytes(0)) 438 .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) 439 .add(new Delete(Bytes.toBytes(0))); 440 table.mutateRow(mutations).join(); 441 assertTrace("BATCH", hasAttributes( 442 containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT"))); 443 } 444 445 @Test 446 public void testScanAll() { 447 table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); 448 assertTrace("SCAN"); 449 } 450 451 @Test 452 public void testScan() throws Throwable { 453 final CountDownLatch doneSignal = new CountDownLatch(1); 454 final AtomicInteger count = new AtomicInteger(); 455 final AtomicReference<Throwable> throwable = new AtomicReference<>(); 456 final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); 457 table.scan(scan, new ScanResultConsumer() { 458 @Override 459 public boolean onNext(Result result) { 460 if (result.getRow() != null) { 461 count.incrementAndGet(); 462 } 463 return true; 464 } 465 466 @Override 467 public void onError(Throwable error) { 468 throwable.set(error); 469 doneSignal.countDown(); 470 } 471 472 @Override 473 public void onComplete() { 474 doneSignal.countDown(); 475 } 476 }); 477 doneSignal.await(); 478 if (throwable.get() != null) { 479 throw throwable.get(); 480 } 481 assertThat("user code did not run. check test setup.", count.get(), greaterThan(0)); 482 assertTrace("SCAN"); 483 } 484 485 @Test 486 public void testGetScanner() { 487 final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); 488 try (ResultScanner scanner = table.getScanner(scan)) { 489 int count = 0; 490 for (Result result : scanner) { 491 if (result.getRow() != null) { 492 count++; 493 } 494 } 495 // do something with it. 496 assertThat(count, greaterThanOrEqualTo(0)); 497 } 498 assertTrace("SCAN"); 499 } 500 501 @Test 502 public void testExistsList() { 503 CompletableFuture 504 .allOf( 505 table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 506 .join(); 507 assertTrace("BATCH", 508 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 509 } 510 511 @Test 512 public void testExistsAll() { 513 table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); 514 assertTrace("BATCH", 515 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 516 } 517 518 @Test 519 public void testGetList() { 520 CompletableFuture 521 .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 522 .join(); 523 assertTrace("BATCH", 524 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 525 } 526 527 @Test 528 public void testGetAll() { 529 table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); 530 assertTrace("BATCH", 531 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); 532 } 533 534 @Test 535 public void testPutList() { 536 CompletableFuture 537 .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 538 Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) 539 .join(); 540 assertTrace("BATCH", 541 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 542 } 543 544 @Test 545 public void testPutAll() { 546 table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), 547 Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); 548 assertTrace("BATCH", 549 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); 550 } 551 552 @Test 553 public void testDeleteList() { 554 CompletableFuture 555 .allOf( 556 table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 557 .join(); 558 assertTrace("BATCH", 559 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 560 } 561 562 @Test 563 public void testDeleteAll() { 564 table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 565 assertTrace("BATCH", 566 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 567 } 568 569 @Test 570 public void testBatch() { 571 CompletableFuture 572 .allOf( 573 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) 574 .join(); 575 assertTrace("BATCH", 576 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 577 } 578 579 @Test 580 public void testBatchAll() { 581 table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); 582 assertTrace("BATCH", 583 hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); 584 } 585}