001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations;
026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import com.google.protobuf.RpcChannel;
031import io.opentelemetry.api.trace.Span;
032import io.opentelemetry.api.trace.StatusCode;
033import io.opentelemetry.context.Scope;
034import java.io.IOException;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.List;
038import java.util.Map;
039import java.util.concurrent.CompletableFuture;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.function.Function;
044import java.util.function.Supplier;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.hbase.CompareOperator;
047import org.apache.hadoop.hbase.DoNotRetryIOException;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.HRegionLocation;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
052import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
053import org.apache.hadoop.hbase.filter.Filter;
054import org.apache.hadoop.hbase.io.TimeRange;
055import org.apache.hadoop.hbase.ipc.HBaseRpcController;
056import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
057import org.apache.hadoop.hbase.trace.TraceUtil;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.ReflectionUtils;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
065import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
066import org.apache.hbase.thirdparty.io.netty.util.Timer;
067
068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
078
079/**
080 * The implementation of RawAsyncTable.
081 * <p/>
082 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
083 * be finished inside the rpc framework thread, which means that the callbacks registered to the
084 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
085 * this class should not try to do time consuming tasks in the callbacks.
086 * @since 2.0.0
087 * @see AsyncTableImpl
088 */
089@InterfaceAudience.Private
090class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
091
092  private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
093
094  private final AsyncConnectionImpl conn;
095
096  private final Timer retryTimer;
097
098  private final TableName tableName;
099
100  private final int defaultScannerCaching;
101
102  private final long defaultScannerMaxResultSize;
103
104  private final long rpcTimeoutNs;
105
106  private final long readRpcTimeoutNs;
107
108  private final long writeRpcTimeoutNs;
109
110  private final long operationTimeoutNs;
111
112  private final long scanTimeoutNs;
113
114  private final long pauseNs;
115
116  private final long pauseNsForServerOverloaded;
117
118  private final int maxAttempts;
119
120  private final int startLogErrorsCnt;
121
122  private final Map<String, byte[]> requestAttributes;
123
124  RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
125    this.conn = conn;
126    this.retryTimer = retryTimer;
127    this.tableName = builder.tableName;
128    this.rpcTimeoutNs = builder.rpcTimeoutNs;
129    this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
130    this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
131    this.operationTimeoutNs = builder.operationTimeoutNs;
132    this.scanTimeoutNs = builder.scanTimeoutNs;
133    this.pauseNs = builder.pauseNs;
134    if (builder.pauseNsForServerOverloaded < builder.pauseNs) {
135      LOG.warn(
136        "Configured value of pauseNsForServerOverloaded is {} ms, which is less than"
137          + " the normal pause value {} ms, use the greater one instead",
138        TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded),
139        TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
140      this.pauseNsForServerOverloaded = builder.pauseNs;
141    } else {
142      this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded;
143    }
144    this.maxAttempts = builder.maxAttempts;
145    this.startLogErrorsCnt = builder.startLogErrorsCnt;
146    this.defaultScannerCaching = tableName.isSystemTable()
147      ? conn.connConf.getMetaScannerCaching()
148      : conn.connConf.getScannerCaching();
149    this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
150    this.requestAttributes = builder.requestAttributes;
151  }
152
153  @Override
154  public TableName getName() {
155    return tableName;
156  }
157
158  @Override
159  public Configuration getConfiguration() {
160    return conn.getConfiguration();
161  }
162
163  @Override
164  public CompletableFuture<TableDescriptor> getDescriptor() {
165    return conn.getAdmin().getDescriptor(tableName);
166  }
167
168  @Override
169  public AsyncTableRegionLocator getRegionLocator() {
170    return conn.getRegionLocator(tableName);
171  }
172
173  @FunctionalInterface
174  private interface Converter<D, I, S> {
175    D convert(I info, S src) throws IOException;
176  }
177
178  @FunctionalInterface
179  private interface RpcCall<RESP, REQ> {
180    void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
181      RpcCallback<RESP> done);
182  }
183
184  private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
185    HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
186    Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
187    Converter<RESP, HBaseRpcController, PRESP> respConverter) {
188    CompletableFuture<RESP> future = new CompletableFuture<>();
189    try {
190      rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
191        new RpcCallback<PRESP>() {
192
193          @Override
194          public void run(PRESP resp) {
195            if (controller.failed()) {
196              future.completeExceptionally(controller.getFailed());
197            } else {
198              try {
199                future.complete(respConverter.convert(controller, resp));
200              } catch (IOException e) {
201                future.completeExceptionally(e);
202              }
203            }
204          }
205        });
206    } catch (IOException e) {
207      future.completeExceptionally(e);
208    }
209    return future;
210  }
211
212  private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
213    HRegionLocation loc, ClientService.Interface stub, REQ req,
214    Converter<MutateRequest, byte[], REQ> reqConvert,
215    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
216    return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
217      respConverter);
218  }
219
220  private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
221    HRegionLocation loc, ClientService.Interface stub, REQ req,
222    Converter<MutateRequest, byte[], REQ> reqConvert) {
223    return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> {
224      return null;
225    });
226  }
227
228  private static Result toResult(HBaseRpcController controller, MutateResponse resp)
229    throws IOException {
230    if (!resp.hasResult()) {
231      return null;
232    }
233    return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner());
234  }
235
236  @FunctionalInterface
237  private interface NoncedConverter<D, I, S> {
238    D convert(I info, S src, long nonceGroup, long nonce) throws IOException;
239  }
240
241  private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce,
242    HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
243    NoncedConverter<MutateRequest, byte[], REQ> reqConvert,
244    Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
245    return mutate(controller, loc, stub, req,
246      (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter);
247  }
248
249  private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) {
250    return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority)
251      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
252      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
253      .pause(pauseNs, TimeUnit.NANOSECONDS)
254      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
255      .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes)
256      .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes);
257  }
258
259  private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
260    newCaller(R row, long rpcTimeoutNs) {
261    return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs);
262  }
263
264  private CompletableFuture<Result> get(Get get, int replicaId) {
265    return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
266      .action((controller, loc, stub) -> RawAsyncTableImpl.<Get, GetRequest, GetResponse,
267        Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest,
268          (s, c, req, done) -> s.get(c, req, done),
269          (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
270      .replicaId(replicaId).call();
271  }
272
273  private TableOperationSpanBuilder newTableOperationSpanBuilder() {
274    return new TableOperationSpanBuilder(conn).setTableName(tableName);
275  }
276
277  @Override
278  public CompletableFuture<Result> get(Get get) {
279    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get);
280    return tracedFuture(
281      () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
282        RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
283        conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
284      supplier);
285  }
286
287  @Override
288  public CompletableFuture<Void> put(Put put) {
289    validatePut(put, conn.connConf.getMaxKeyValueSize());
290    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put);
291    return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs)
292      .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub,
293        put, RequestConverter::buildMutateRequest))
294      .call(), supplier);
295  }
296
297  @Override
298  public CompletableFuture<Void> delete(Delete delete) {
299    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete);
300    return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs)
301      .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc,
302        stub, delete, RequestConverter::buildMutateRequest))
303      .call(), supplier);
304  }
305
306  @Override
307  public CompletableFuture<Result> append(Append append) {
308    checkHasFamilies(append);
309    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append);
310    return tracedFuture(() -> {
311      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
312      long nonce = conn.getNonceGenerator().newNonce();
313      return this.<Result, Append> newCaller(append, rpcTimeoutNs)
314        .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce,
315          controller, loc, stub, append, RequestConverter::buildMutateRequest,
316          RawAsyncTableImpl::toResult))
317        .call();
318    }, supplier);
319  }
320
321  @Override
322  public CompletableFuture<Result> increment(Increment increment) {
323    checkHasFamilies(increment);
324    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment);
325    return tracedFuture(() -> {
326      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
327      long nonce = conn.getNonceGenerator().newNonce();
328      return this.<Result, Increment> newCaller(increment, rpcTimeoutNs)
329        .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce,
330          controller, loc, stub, increment, RequestConverter::buildMutateRequest,
331          RawAsyncTableImpl::toResult))
332        .call();
333    }, supplier);
334  }
335
336  private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
337
338    private final byte[] row;
339
340    private final byte[] family;
341
342    private byte[] qualifier;
343
344    private TimeRange timeRange;
345
346    private CompareOperator op;
347
348    private byte[] value;
349
350    public CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
351      this.row = Preconditions.checkNotNull(row, "row is null");
352      this.family = Preconditions.checkNotNull(family, "family is null");
353    }
354
355    @Override
356    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
357      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
358        + " an empty byte array, or just do not call this method if you want a null qualifier");
359      return this;
360    }
361
362    @Override
363    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
364      this.timeRange = timeRange;
365      return this;
366    }
367
368    @Override
369    public CheckAndMutateBuilder ifNotExists() {
370      this.op = CompareOperator.EQUAL;
371      this.value = null;
372      return this;
373    }
374
375    @Override
376    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
377      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
378      this.value = Preconditions.checkNotNull(value, "value is null");
379      return this;
380    }
381
382    private void preCheck() {
383      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
384        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
385    }
386
387    @Override
388    public CompletableFuture<Boolean> thenPut(Put put) {
389      validatePut(put, conn.connConf.getMaxKeyValueSize());
390      preCheck();
391      final Supplier<Span> supplier = newTableOperationSpanBuilder()
392        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
393        .setContainerOperations(put);
394      return tracedFuture(
395        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
396          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
397            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
398              null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
399            (c, r) -> r.getProcessed()))
400          .call(),
401        supplier);
402    }
403
404    @Override
405    public CompletableFuture<Boolean> thenDelete(Delete delete) {
406      preCheck();
407      final Supplier<Span> supplier = newTableOperationSpanBuilder()
408        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
409        .setContainerOperations(delete);
410      return tracedFuture(
411        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
412          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
413            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
414              null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
415            (c, r) -> r.getProcessed()))
416          .call(),
417        supplier);
418    }
419
420    @Override
421    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
422      preCheck();
423      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
424      final Supplier<Span> supplier = newTableOperationSpanBuilder()
425        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
426        .setContainerOperations(mutations);
427      return tracedFuture(() -> RawAsyncTableImpl.this
428        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
429        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
430          mutations,
431          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
432            null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
433          CheckAndMutateResult::isSuccess))
434        .call(), supplier);
435    }
436  }
437
438  @Override
439  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
440    return new CheckAndMutateBuilderImpl(row, family);
441  }
442
443  private final class CheckAndMutateWithFilterBuilderImpl
444    implements CheckAndMutateWithFilterBuilder {
445
446    private final byte[] row;
447
448    private final Filter filter;
449
450    private TimeRange timeRange;
451
452    public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
453      this.row = Preconditions.checkNotNull(row, "row is null");
454      this.filter = Preconditions.checkNotNull(filter, "filter is null");
455    }
456
457    @Override
458    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
459      this.timeRange = timeRange;
460      return this;
461    }
462
463    @Override
464    public CompletableFuture<Boolean> thenPut(Put put) {
465      validatePut(put, conn.connConf.getMaxKeyValueSize());
466      final Supplier<Span> supplier = newTableOperationSpanBuilder()
467        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
468        .setContainerOperations(put);
469      return tracedFuture(
470        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
471          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
472            (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
473              timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
474            (c, r) -> r.getProcessed()))
475          .call(),
476        supplier);
477    }
478
479    @Override
480    public CompletableFuture<Boolean> thenDelete(Delete delete) {
481      final Supplier<Span> supplier = newTableOperationSpanBuilder()
482        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
483        .setContainerOperations(delete);
484      return tracedFuture(
485        () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
486          .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
487            (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter,
488              timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
489            (c, r) -> r.getProcessed()))
490          .call(),
491        supplier);
492    }
493
494    @Override
495    public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
496      validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
497      final Supplier<Span> supplier = newTableOperationSpanBuilder()
498        .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
499        .setContainerOperations(mutations);
500      return tracedFuture(() -> RawAsyncTableImpl.this
501        .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
502        .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
503          mutations,
504          (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
505            timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
506          CheckAndMutateResult::isSuccess))
507        .call(), supplier);
508    }
509  }
510
511  @Override
512  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
513    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
514  }
515
516  @Override
517  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
518    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate)
519      .setContainerOperations(checkAndMutate.getAction());
520    return tracedFuture(() -> {
521      if (
522        checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
523          || checkAndMutate.getAction() instanceof Increment
524          || checkAndMutate.getAction() instanceof Append
525      ) {
526        Mutation mutation = (Mutation) checkAndMutate.getAction();
527        if (mutation instanceof Put) {
528          validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
529        }
530        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
531        long nonce = conn.getNonceGenerator().newNonce();
532        return RawAsyncTableImpl.this
533          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(),
534            rpcTimeoutNs)
535          .action(
536            (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation,
537              (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(),
538                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
539                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
540                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce),
541              (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
542          .call();
543      } else if (checkAndMutate.getAction() instanceof RowMutations) {
544        RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
545        validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize());
546        long nonceGroup = conn.getNonceGenerator().getNonceGroup();
547        long nonce = conn.getNonceGenerator().newNonce();
548        return RawAsyncTableImpl.this
549          .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(),
550            rpcTimeoutNs)
551          .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult,
552            CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations,
553              (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(),
554                checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
555                checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
556                checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce),
557              resp -> resp))
558          .call();
559      } else {
560        CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>();
561        future.completeExceptionally(new DoNotRetryIOException(
562          "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
563        return future;
564      }
565    }, supplier);
566  }
567
568  @Override
569  public List<CompletableFuture<CheckAndMutateResult>>
570    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
571    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates)
572      .setContainerOperations(checkAndMutates);
573    return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream()
574      .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier);
575  }
576
577  // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
578  // so here I write a new method as I do not want to change the abstraction of call method.
579  @SuppressWarnings("unchecked")
580  private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
581    HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
582    Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) {
583    CompletableFuture<RESP> future = new CompletableFuture<>();
584    try {
585      byte[] regionName = loc.getRegion().getRegionName();
586      MultiRequest req = reqConvert.convert(regionName, mutation);
587      stub.multi(controller, req, new RpcCallback<MultiResponse>() {
588
589        @Override
590        public void run(MultiResponse resp) {
591          if (controller.failed()) {
592            future.completeExceptionally(controller.getFailed());
593          } else {
594            try {
595              org.apache.hadoop.hbase.client.MultiResponse multiResp =
596                ResponseConverter.getResults(req, resp, controller.cellScanner());
597              ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
598                loc.getServerName(), multiResp);
599              Throwable ex = multiResp.getException(regionName);
600              if (ex != null) {
601                future.completeExceptionally(ex instanceof IOException
602                  ? ex
603                  : new IOException(
604                    "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
605              } else {
606                future.complete(
607                  respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0)));
608              }
609            } catch (IOException e) {
610              future.completeExceptionally(e);
611            }
612          }
613        }
614      });
615    } catch (IOException e) {
616      future.completeExceptionally(e);
617    }
618    return future;
619  }
620
621  @Override
622  public CompletableFuture<Result> mutateRow(RowMutations mutations) {
623    validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
624    long nonceGroup = conn.getNonceGenerator().getNonceGroup();
625    long nonce = conn.getNonceGenerator().newNonce();
626    final Supplier<Span> supplier =
627      newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations);
628    return tracedFuture(
629      () -> this
630        .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
631        .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub,
632          mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
633          resp -> resp))
634        .call(),
635      supplier);
636  }
637
638  private Scan setDefaultScanConfig(Scan scan) {
639    // always create a new scan object as we may reset the start row later.
640    Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan);
641    if (newScan.getCaching() <= 0) {
642      newScan.setCaching(defaultScannerCaching);
643    }
644    if (newScan.getMaxResultSize() <= 0) {
645      newScan.setMaxResultSize(defaultScannerMaxResultSize);
646    }
647    return newScan;
648  }
649
650  @Override
651  public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
652    new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
653      pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
654      startLogErrorsCnt, requestAttributes).start();
655  }
656
657  private long resultSize2CacheSize(long maxResultSize) {
658    // * 2 if possible
659    return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
660  }
661
662  @Override
663  public AsyncTableResultScanner getScanner(Scan scan) {
664    final long maxCacheSize = resultSize2CacheSize(
665      scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize);
666    final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan);
667    final AsyncTableResultScanner scanner =
668      new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize);
669    scan(scan, scanner);
670    return scanner;
671  }
672
673  @Override
674  public CompletableFuture<List<Result>> scanAll(Scan scan) {
675    CompletableFuture<List<Result>> future = new CompletableFuture<>();
676    List<Result> scanResults = new ArrayList<>();
677    scan(scan, new AdvancedScanResultConsumer() {
678
679      @Override
680      public void onNext(Result[] results, ScanController controller) {
681        scanResults.addAll(Arrays.asList(results));
682      }
683
684      @Override
685      public void onError(Throwable error) {
686        future.completeExceptionally(error);
687      }
688
689      @Override
690      public void onComplete() {
691        future.complete(scanResults);
692      }
693    });
694    return future;
695  }
696
697  @Override
698  public List<CompletableFuture<Result>> get(List<Get> gets) {
699    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets)
700      .setContainerOperations(HBaseSemanticAttributes.Operation.GET);
701    return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
702  }
703
704  @Override
705  public List<CompletableFuture<Void>> put(List<Put> puts) {
706    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts)
707      .setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
708    return tracedFutures(() -> voidMutate(puts), supplier);
709  }
710
711  @Override
712  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
713    final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes)
714      .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
715    return tracedFutures(() -> voidMutate(deletes), supplier);
716  }
717
718  @Override
719  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
720    final Supplier<Span> supplier =
721      newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions);
722    return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
723  }
724
725  private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
726    return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
727      .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
728  }
729
730  private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
731    for (Row action : actions) {
732      if (action instanceof Put) {
733        validatePut((Put) action, conn.connConf.getMaxKeyValueSize());
734      } else if (action instanceof CheckAndMutate) {
735        CheckAndMutate checkAndMutate = (CheckAndMutate) action;
736        if (checkAndMutate.getAction() instanceof Put) {
737          validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
738        } else if (checkAndMutate.getAction() instanceof RowMutations) {
739          validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(),
740            conn.connConf.getMaxKeyValueSize());
741        }
742      } else if (action instanceof RowMutations) {
743        validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize());
744      }
745    }
746    return conn.callerFactory.batch().table(tableName).actions(actions)
747      .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
748      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
749      .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
750      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
751      .setRequestAttributes(requestAttributes).call();
752  }
753
754  @Override
755  public long getRpcTimeout(TimeUnit unit) {
756    return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
757  }
758
759  @Override
760  public long getReadRpcTimeout(TimeUnit unit) {
761    return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
762  }
763
764  @Override
765  public long getWriteRpcTimeout(TimeUnit unit) {
766    return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
767  }
768
769  @Override
770  public long getOperationTimeout(TimeUnit unit) {
771    return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
772  }
773
774  @Override
775  public long getScanTimeout(TimeUnit unit) {
776    return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
777  }
778
779  @Override
780  public Map<String, byte[]> getRequestAttributes() {
781    return requestAttributes;
782  }
783
784  private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
785    ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
786    RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
787      region, row, rpcTimeoutNs, operationTimeoutNs);
788    final Span span = Span.current();
789    S stub = stubMaker.apply(channel);
790    CompletableFuture<R> future = new CompletableFuture<>();
791    ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
792    callable.call(stub, controller, resp -> {
793      try (Scope ignored = span.makeCurrent()) {
794        if (controller.failed()) {
795          final Throwable failure = controller.getFailed();
796          future.completeExceptionally(failure);
797          TraceUtil.setError(span, failure);
798        } else {
799          future.complete(resp);
800          span.setStatus(StatusCode.OK);
801        }
802      } finally {
803        span.end();
804      }
805    });
806    return future;
807  }
808
809  @Override
810  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
811    ServiceCaller<S, R> callable, byte[] row) {
812    return coprocessorService(stubMaker, callable, null, row);
813  }
814
815  private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) {
816    if (isEmptyStopRow(endKey)) {
817      if (isEmptyStopRow(region.getEndKey())) {
818        return true;
819      }
820      return false;
821    } else {
822      if (isEmptyStopRow(region.getEndKey())) {
823        return true;
824      }
825      int c = Bytes.compareTo(endKey, region.getEndKey());
826      // 1. if the region contains endKey
827      // 2. endKey is equal to the region's endKey and we do not want to include endKey.
828      return c < 0 || (c == 0 && !endKeyInclusive);
829    }
830  }
831
832  private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
833    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
834    byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
835    AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
836    final Span span = Span.current();
837    if (error != null) {
838      callback.onError(error);
839      TraceUtil.setError(span, error);
840      span.end();
841      return;
842    }
843    unfinishedRequest.incrementAndGet();
844    RegionInfo region = loc.getRegion();
845    if (locateFinished(region, endKey, endKeyInclusive)) {
846      locateFinished.set(true);
847    } else {
848      addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(),
849        RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> {
850          try (Scope ignored = span.makeCurrent()) {
851            onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
852              locateFinished, unfinishedRequest, l, e);
853          }
854        });
855    }
856    addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
857      try (Scope ignored = span.makeCurrent()) {
858        if (e != null) {
859          callback.onRegionError(region, e);
860        } else {
861          callback.onRegionComplete(region, r);
862        }
863        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
864          callback.onComplete();
865        }
866      }
867    });
868  }
869
870  private final class CoprocessorServiceBuilderImpl<S, R>
871    implements CoprocessorServiceBuilder<S, R> {
872
873    private final Function<RpcChannel, S> stubMaker;
874
875    private final ServiceCaller<S, R> callable;
876
877    private final CoprocessorCallback<R> callback;
878
879    private byte[] startKey = HConstants.EMPTY_START_ROW;
880
881    private boolean startKeyInclusive;
882
883    private byte[] endKey = HConstants.EMPTY_END_ROW;
884
885    private boolean endKeyInclusive;
886
887    public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
888      ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
889      this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
890      this.callable = Preconditions.checkNotNull(callable, "callable is null");
891      this.callback = Preconditions.checkNotNull(callback, "callback is null");
892    }
893
894    @Override
895    public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
896      this.startKey = Preconditions.checkNotNull(startKey,
897        "startKey is null. Consider using"
898          + " an empty byte array, or just do not call this method if you want to start selection"
899          + " from the first region");
900      this.startKeyInclusive = inclusive;
901      return this;
902    }
903
904    @Override
905    public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
906      this.endKey = Preconditions.checkNotNull(endKey,
907        "endKey is null. Consider using"
908          + " an empty byte array, or just do not call this method if you want to continue"
909          + " selection to the last region");
910      this.endKeyInclusive = inclusive;
911      return this;
912    }
913
914    @Override
915    public void execute() {
916      final Span span = newTableOperationSpanBuilder()
917        .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build();
918      try (Scope ignored = span.makeCurrent()) {
919        final RegionLocateType regionLocateType =
920          startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER;
921        final CompletableFuture<HRegionLocation> future = conn.getLocator()
922          .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
923        addListener(future, (loc, error) -> {
924          try (Scope ignored1 = span.makeCurrent()) {
925            onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
926              endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error);
927          }
928        });
929      }
930    }
931  }
932
933  @Override
934  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
935    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
936    CoprocessorCallback<R> callback) {
937    return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
938  }
939}