001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
027import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
028import static org.hamcrest.MatcherAssert.assertThat;
029import static org.hamcrest.Matchers.allOf;
030import static org.hamcrest.Matchers.containsString;
031import static org.hamcrest.Matchers.greaterThan;
032import static org.hamcrest.Matchers.greaterThanOrEqualTo;
033import static org.hamcrest.Matchers.hasItem;
034import static org.hamcrest.Matchers.hasSize;
035import static org.junit.Assert.fail;
036import static org.mockito.ArgumentMatchers.any;
037import static org.mockito.ArgumentMatchers.anyInt;
038import static org.mockito.ArgumentMatchers.anyLong;
039import static org.mockito.Mockito.doAnswer;
040import static org.mockito.Mockito.mock;
041
042import io.opentelemetry.api.trace.SpanKind;
043import io.opentelemetry.api.trace.StatusCode;
044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
045import io.opentelemetry.sdk.trace.data.SpanData;
046import java.io.IOException;
047import java.util.Arrays;
048import java.util.List;
049import java.util.concurrent.CompletableFuture;
050import java.util.concurrent.CountDownLatch;
051import java.util.concurrent.ForkJoinPool;
052import java.util.concurrent.atomic.AtomicInteger;
053import java.util.concurrent.atomic.AtomicReference;
054import java.util.stream.Collectors;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellBuilderFactory;
058import org.apache.hadoop.hbase.CellBuilderType;
059import org.apache.hadoop.hbase.HBaseClassTestRule;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HRegionLocation;
062import org.apache.hadoop.hbase.MatcherPredicate;
063import org.apache.hadoop.hbase.ServerName;
064import org.apache.hadoop.hbase.TableName;
065import org.apache.hadoop.hbase.Waiter;
066import org.apache.hadoop.hbase.filter.PrefixFilter;
067import org.apache.hadoop.hbase.ipc.HBaseRpcController;
068import org.apache.hadoop.hbase.security.User;
069import org.apache.hadoop.hbase.security.UserProvider;
070import org.apache.hadoop.hbase.testclassification.ClientTests;
071import org.apache.hadoop.hbase.testclassification.MediumTests;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.hamcrest.Matcher;
074import org.hamcrest.core.IsAnything;
075import org.junit.After;
076import org.junit.Before;
077import org.junit.ClassRule;
078import org.junit.Rule;
079import org.junit.Test;
080import org.junit.experimental.categories.Category;
081import org.mockito.invocation.InvocationOnMock;
082import org.mockito.stubbing.Answer;
083
084import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
085import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
086
087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
101
102@Category({ ClientTests.class, MediumTests.class })
103public class TestAsyncTableTracing {
104
105  @ClassRule
106  public static final HBaseClassTestRule CLASS_RULE =
107    HBaseClassTestRule.forClass(TestAsyncTableTracing.class);
108
109  private static Configuration CONF = HBaseConfiguration.create();
110
111  private ClientService.Interface stub;
112
113  private AsyncConnectionImpl conn;
114
115  private AsyncTable<ScanResultConsumer> table;
116
117  @Rule
118  public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
119
120  @Before
121  public void setUp() throws IOException {
122    stub = mock(ClientService.Interface.class);
123    AtomicInteger scanNextCalled = new AtomicInteger(0);
124    doAnswer(new Answer<Void>() {
125
126      @Override
127      public Void answer(InvocationOnMock invocation) throws Throwable {
128        ScanRequest req = invocation.getArgument(1);
129        RpcCallback<ScanResponse> done = invocation.getArgument(2);
130        if (!req.hasScannerId()) {
131          done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
132            .setMoreResultsInRegion(true).setMoreResults(true).build());
133        } else {
134          if (req.hasCloseScanner() && req.getCloseScanner()) {
135            done.run(ScanResponse.getDefaultInstance());
136          } else {
137            Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
138              .setType(Cell.Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
139              .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
140              .setValue(Bytes.toBytes("v")).build();
141            Result result = Result.create(Arrays.asList(cell));
142            ScanResponse.Builder builder = ScanResponse.newBuilder().setScannerId(1).setTtl(800)
143              .addResults(ProtobufUtil.toResult(result));
144            if (req.getLimitOfRows() == 1) {
145              builder.setMoreResultsInRegion(false).setMoreResults(false);
146            } else {
147              builder.setMoreResultsInRegion(true).setMoreResults(true);
148            }
149            ForkJoinPool.commonPool().execute(() -> done.run(builder.build()));
150          }
151        }
152        return null;
153      }
154    }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
155    doAnswer(new Answer<Void>() {
156
157      @Override
158      public Void answer(InvocationOnMock invocation) throws Throwable {
159        ClientProtos.MultiRequest req = invocation.getArgument(1);
160        ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
161        for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
162          RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
163          for (ClientProtos.Action ignored : regionAction.getActionList()) {
164            raBuilder.addResultOrException(
165              ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
166          }
167          builder.addRegionActionResult(raBuilder);
168        }
169        ClientProtos.MultiResponse resp = builder.build();
170        RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
171        ForkJoinPool.commonPool().execute(() -> done.run(resp));
172        return null;
173      }
174    }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any());
175    doAnswer(new Answer<Void>() {
176
177      @Override
178      public Void answer(InvocationOnMock invocation) throws Throwable {
179        MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation();
180        MutateResponse resp;
181        switch (req.getMutateType()) {
182          case INCREMENT:
183            ColumnValue value = req.getColumnValue(0);
184            QualifierValue qvalue = value.getQualifierValue(0);
185            Cell cell =
186              CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
187                .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray())
188                .setQualifier(qvalue.getQualifier().toByteArray())
189                .setValue(qvalue.getValue().toByteArray()).build();
190            resp = MutateResponse.newBuilder()
191              .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build();
192            break;
193          default:
194            resp = MutateResponse.getDefaultInstance();
195            break;
196        }
197        RpcCallback<MutateResponse> done = invocation.getArgument(2);
198        ForkJoinPool.commonPool().execute(() -> done.run(resp));
199        return null;
200      }
201    }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any());
202    doAnswer(new Answer<Void>() {
203
204      @Override
205      public Void answer(InvocationOnMock invocation) throws Throwable {
206        RpcCallback<GetResponse> done = invocation.getArgument(2);
207        ForkJoinPool.commonPool().execute(() -> done.run(GetResponse.getDefaultInstance()));
208        return null;
209      }
210    }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
211    final User user = UserProvider.instantiate(CONF).getCurrent();
212    conn =
213      new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", user) {
214
215        @Override
216        AsyncRegionLocator getLocator() {
217          AsyncRegionLocator locator = mock(AsyncRegionLocator.class);
218          Answer<CompletableFuture<HRegionLocation>> answer =
219            new Answer<CompletableFuture<HRegionLocation>>() {
220
221              @Override
222              public CompletableFuture<HRegionLocation> answer(InvocationOnMock invocation)
223                throws Throwable {
224                TableName tableName = invocation.getArgument(0);
225                RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
226                ServerName serverName = ServerName.valueOf("rs", 16010, 12345);
227                HRegionLocation loc = new HRegionLocation(info, serverName);
228                return CompletableFuture.completedFuture(loc);
229              }
230            };
231          doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
232            any(RegionLocateType.class), anyLong());
233          doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class),
234            anyInt(), any(RegionLocateType.class), anyLong());
235          return locator;
236        }
237
238        @Override
239        ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
240          return stub;
241        }
242      };
243    table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool());
244  }
245
246  @After
247  public void tearDown() throws IOException {
248    Closeables.close(conn, true);
249  }
250
251  private void assertTrace(String tableOperation) {
252    assertTrace(tableOperation, new IsAnything<>());
253  }
254
255  private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
256    // n.b. this method implementation must match the one of the same name found in
257    // TestHTableTracing
258    final TableName tableName = table.getName();
259    final Matcher<SpanData> spanLocator =
260      allOf(hasName(containsString(tableOperation)), hasEnded());
261    final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
262
263    Waiter.waitFor(CONF, 1000, new MatcherPredicate<>("waiting for span to emit",
264      () -> traceRule.getSpans(), hasItem(spanLocator)));
265    List<SpanData> candidateSpans =
266      traceRule.getSpans().stream().filter(spanLocator::matches).collect(Collectors.toList());
267    assertThat(candidateSpans, hasSize(1));
268    SpanData data = candidateSpans.iterator().next();
269    assertThat(data,
270      allOf(hasName(expectedName), hasKind(SpanKind.CLIENT), hasStatusWithCode(StatusCode.OK),
271        buildConnectionAttributesMatcher(conn), buildTableAttributesMatcher(tableName), matcher));
272  }
273
274  @Test
275  public void testExists() {
276    table.exists(new Get(Bytes.toBytes(0))).join();
277    assertTrace("GET");
278  }
279
280  @Test
281  public void testGet() {
282    table.get(new Get(Bytes.toBytes(0))).join();
283    assertTrace("GET");
284  }
285
286  @Test
287  public void testPut() {
288    table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
289      Bytes.toBytes("v"))).join();
290    assertTrace("PUT");
291  }
292
293  @Test
294  public void testDelete() {
295    table.delete(new Delete(Bytes.toBytes(0))).join();
296    assertTrace("DELETE");
297  }
298
299  @Test
300  public void testAppend() {
301    table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
302      Bytes.toBytes("v"))).join();
303    assertTrace("APPEND");
304  }
305
306  @Test
307  public void testIncrement() {
308    table
309      .increment(
310        new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
311      .join();
312    assertTrace("INCREMENT");
313  }
314
315  @Test
316  public void testIncrementColumnValue1() {
317    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
318      .join();
319    assertTrace("INCREMENT");
320  }
321
322  @Test
323  public void testIncrementColumnValue2() {
324    table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
325      Durability.ASYNC_WAL).join();
326    assertTrace("INCREMENT");
327  }
328
329  @Test
330  public void testCheckAndMutate() {
331    table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
332      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
333      .build(new Delete(Bytes.toBytes(0)))).join();
334    assertTrace("CHECK_AND_MUTATE");
335  }
336
337  @Test
338  public void testCheckAndMutateList() {
339    CompletableFuture
340      .allOf(table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
341        .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
342        .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
343      .join();
344    assertTrace("BATCH",
345      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
346        "CHECK_AND_MUTATE", "DELETE")));
347  }
348
349  @Test
350  public void testCheckAndMutateAll() {
351    table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
352      .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
353      .build(new Delete(Bytes.toBytes(0))))).join();
354    assertTrace("BATCH",
355      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations",
356        "CHECK_AND_MUTATE", "DELETE")));
357  }
358
359  private void testCheckAndMutateBuilder(Row op) {
360    AsyncTable.CheckAndMutateBuilder builder =
361      table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq"))
362        .ifEquals(Bytes.toBytes("v"));
363    if (op instanceof Put) {
364      Put put = (Put) op;
365      builder.thenPut(put).join();
366    } else if (op instanceof Delete) {
367      Delete delete = (Delete) op;
368      builder.thenDelete(delete).join();
369    } else if (op instanceof RowMutations) {
370      RowMutations mutations = (RowMutations) op;
371      builder.thenMutate(mutations).join();
372    } else {
373      fail("unsupported CheckAndPut operation " + op);
374    }
375    assertTrace("CHECK_AND_MUTATE");
376  }
377
378  @Test
379  public void testCheckAndMutateBuilderThenPut() {
380    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
381      Bytes.toBytes("v"));
382    testCheckAndMutateBuilder(put);
383  }
384
385  @Test
386  public void testCheckAndMutateBuilderThenDelete() {
387    testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
388  }
389
390  @Test
391  public void testCheckAndMutateBuilderThenMutations() throws IOException {
392    RowMutations mutations = new RowMutations(Bytes.toBytes(0))
393      .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
394        Bytes.toBytes("v")))
395      .add((Mutation) new Delete(Bytes.toBytes(0)));
396    testCheckAndMutateBuilder(mutations);
397  }
398
399  private void testCheckAndMutateWithFilterBuilder(Row op) {
400    // use of `PrefixFilter` is completely arbitrary here.
401    AsyncTable.CheckAndMutateWithFilterBuilder builder =
402      table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
403    if (op instanceof Put) {
404      Put put = (Put) op;
405      builder.thenPut(put).join();
406    } else if (op instanceof Delete) {
407      Delete delete = (Delete) op;
408      builder.thenDelete(delete).join();
409    } else if (op instanceof RowMutations) {
410      RowMutations mutations = (RowMutations) op;
411      builder.thenMutate(mutations).join();
412    } else {
413      fail("unsupported CheckAndPut operation " + op);
414    }
415    assertTrace("CHECK_AND_MUTATE");
416  }
417
418  @Test
419  public void testCheckAndMutateWithFilterBuilderThenPut() {
420    Put put = new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
421      Bytes.toBytes("v"));
422    testCheckAndMutateWithFilterBuilder(put);
423  }
424
425  @Test
426  public void testCheckAndMutateWithFilterBuilderThenDelete() {
427    testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
428  }
429
430  @Test
431  public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
432    RowMutations mutations = new RowMutations(Bytes.toBytes(0))
433      .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"),
434        Bytes.toBytes("v")))
435      .add((Mutation) new Delete(Bytes.toBytes(0)));
436    testCheckAndMutateWithFilterBuilder(mutations);
437  }
438
439  @Test
440  public void testMutateRow() throws IOException {
441    final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
442      .add((Mutation) new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
443        Bytes.toBytes("v")))
444      .add((Mutation) new Delete(Bytes.toBytes(0)));
445    table.mutateRow(mutations).join();
446    assertTrace("BATCH", hasAttributes(
447      containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
448  }
449
450  @Test
451  public void testScanAll() {
452    table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
453    assertTrace("SCAN");
454  }
455
456  @Test
457  public void testScan() throws Throwable {
458    final CountDownLatch doneSignal = new CountDownLatch(1);
459    final AtomicInteger count = new AtomicInteger();
460    final AtomicReference<Throwable> throwable = new AtomicReference<>();
461    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
462    table.scan(scan, new ScanResultConsumer() {
463      @Override
464      public boolean onNext(Result result) {
465        if (result.getRow() != null) {
466          count.incrementAndGet();
467        }
468        return true;
469      }
470
471      @Override
472      public void onError(Throwable error) {
473        throwable.set(error);
474        doneSignal.countDown();
475      }
476
477      @Override
478      public void onComplete() {
479        doneSignal.countDown();
480      }
481    });
482    doneSignal.await();
483    if (throwable.get() != null) {
484      throw throwable.get();
485    }
486    assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
487    assertTrace("SCAN");
488  }
489
490  @Test
491  public void testGetScanner() {
492    final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
493    try (ResultScanner scanner = table.getScanner(scan)) {
494      int count = 0;
495      for (Result result : scanner) {
496        if (result.getRow() != null) {
497          count++;
498        }
499      }
500      // do something with it.
501      assertThat(count, greaterThanOrEqualTo(0));
502    }
503    assertTrace("SCAN");
504  }
505
506  @Test
507  public void testExistsList() {
508    CompletableFuture
509      .allOf(
510        table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
511      .join();
512    assertTrace("BATCH",
513      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
514  }
515
516  @Test
517  public void testExistsAll() {
518    table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
519    assertTrace("BATCH",
520      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
521  }
522
523  @Test
524  public void testGetList() {
525    CompletableFuture
526      .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
527      .join();
528    assertTrace("BATCH",
529      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
530  }
531
532  @Test
533  public void testGetAll() {
534    table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
535    assertTrace("BATCH",
536      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
537  }
538
539  @Test
540  public void testPutList() {
541    CompletableFuture
542      .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
543        Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
544      .join();
545    assertTrace("BATCH",
546      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
547  }
548
549  @Test
550  public void testPutAll() {
551    table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
552      Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
553    assertTrace("BATCH",
554      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
555  }
556
557  @Test
558  public void testDeleteList() {
559    CompletableFuture
560      .allOf(
561        table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
562      .join();
563    assertTrace("BATCH",
564      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
565  }
566
567  @Test
568  public void testDeleteAll() {
569    table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
570    assertTrace("BATCH",
571      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
572  }
573
574  @Test
575  public void testBatch() {
576    CompletableFuture
577      .allOf(
578        table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
579      .join();
580    assertTrace("BATCH",
581      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
582  }
583
584  @Test
585  public void testBatchAll() {
586    table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
587    assertTrace("BATCH",
588      hasAttributes(containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
589  }
590}