001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
021// Internally, we use shaded protobuf. This below are part of our public API.
022// SEE ABOVE NOTE!
023
024import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
025
026import com.google.protobuf.Descriptors;
027import com.google.protobuf.Message;
028import com.google.protobuf.Service;
029import com.google.protobuf.ServiceException;
030import io.opentelemetry.api.trace.Span;
031import io.opentelemetry.api.trace.StatusCode;
032import io.opentelemetry.context.Context;
033import io.opentelemetry.context.Scope;
034import java.io.IOException;
035import java.io.InterruptedIOException;
036import java.util.ArrayList;
037import java.util.Collections;
038import java.util.List;
039import java.util.Map;
040import java.util.Optional;
041import java.util.TreeMap;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Future;
045import java.util.concurrent.SynchronousQueue;
046import java.util.concurrent.ThreadPoolExecutor;
047import java.util.concurrent.TimeUnit;
048import java.util.function.Supplier;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.CompareOperator;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.HTableDescriptor;
054import org.apache.hadoop.hbase.TableName;
055import org.apache.hadoop.hbase.client.coprocessor.Batch;
056import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
057import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
058import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
059import org.apache.hadoop.hbase.filter.Filter;
060import org.apache.hadoop.hbase.io.TimeRange;
061import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
062import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
064import org.apache.hadoop.hbase.trace.TraceUtil;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.Pair;
067import org.apache.hadoop.hbase.util.ReflectionUtils;
068import org.apache.hadoop.hbase.util.Threads;
069import org.apache.yetus.audience.InterfaceAudience;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
078import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
083
084/**
085 * An implementation of {@link Table}. Used to communicate with a single HBase table. Lightweight.
086 * Get as needed and just close when done. Instances of this class SHOULD NOT be constructed
087 * directly. Obtain an instance via {@link Connection}. See {@link ConnectionFactory} class comment
088 * for an example of how.
089 * <p>
090 * This class is thread safe since 2.0.0 if not invoking any of the setter methods. All setters are
091 * moved into {@link TableBuilder} and reserved here only for keeping backward compatibility, and
092 * TODO will be removed soon.
093 * <p>
094 * HTable is no longer a client API. Use {@link Table} instead. It is marked
095 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in <a href=
096 * "https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop
097 * Interface Classification</a> There are no guarantees for backwards source / binary compatibility
098 * and methods or class can change or go away without deprecation.
099 * @see Table
100 * @see Admin
101 * @see Connection
102 * @see ConnectionFactory
103 */
104@InterfaceAudience.Private
105public class HTable implements Table {
106  private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
107  private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
108  private final ClusterConnection connection;
109  private final TableName tableName;
110  private final Configuration configuration;
111  private final ConnectionConfiguration connConfiguration;
112  private boolean closed = false;
113  private final int scannerCaching;
114  private final long scannerMaxResultSize;
115  private final ExecutorService pool; // For Multi & Scan
116  private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc
117  private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX
118  private int readRpcTimeoutMs; // timeout for each read rpc request
119  private int writeRpcTimeoutMs; // timeout for each write rpc request
120
121  private final int scanReadRpcTimeout;
122  private final int scanTimeout;
123  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
124  private final HRegionLocator locator;
125
126  /** The Async process for batch */
127  AsyncProcess multiAp;
128  private final RpcRetryingCallerFactory rpcCallerFactory;
129  private final RpcControllerFactory rpcControllerFactory;
130
131  private final Map<String, byte[]> requestAttributes;
132
133  // Marked Private @since 1.0
134  @InterfaceAudience.Private
135  public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
136    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
137    if (maxThreads == 0) {
138      maxThreads = 1; // is there a better default?
139    }
140    int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1);
141    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
142
143    // Using the "direct handoff" approach, new threads will only be created
144    // if it is necessary and will grow unbounded. This could be bad but in HCM
145    // we only create as many Runnables as there are region servers. It means
146    // it also scales when new region servers are added.
147    ThreadPoolExecutor pool =
148      new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS,
149        new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d")
150          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
151    pool.allowCoreThreadTimeOut(true);
152    return pool;
153  }
154
155  /**
156   * Creates an object to access a HBase table. Used by HBase internally. DO NOT USE. See
157   * {@link ConnectionFactory} class comment for how to get a {@link Table} instance (use
158   * {@link Table} instead of {@link HTable}).
159   * @param connection           Connection to be used.
160   * @param builder              The table builder
161   * @param rpcCallerFactory     The RPC caller factory
162   * @param rpcControllerFactory The RPC controller factory
163   * @param pool                 ExecutorService to be used.
164   */
165  @InterfaceAudience.Private
166  protected HTable(final ConnectionImplementation connection, final TableBuilderBase builder,
167    final RpcRetryingCallerFactory rpcCallerFactory,
168    final RpcControllerFactory rpcControllerFactory, final ExecutorService pool,
169    final Map<String, byte[]> requestAttributes) {
170    this.connection = Preconditions.checkNotNull(connection, "connection is null");
171    this.configuration = connection.getConfiguration();
172    this.connConfiguration = connection.getConnectionConfiguration();
173    if (pool == null) {
174      this.pool = getDefaultExecutor(this.configuration);
175      this.cleanupPoolOnClose = true;
176    } else {
177      this.pool = pool;
178      this.cleanupPoolOnClose = false;
179    }
180    if (rpcCallerFactory == null) {
181      this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
182    } else {
183      this.rpcCallerFactory = rpcCallerFactory;
184    }
185
186    if (rpcControllerFactory == null) {
187      this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
188    } else {
189      this.rpcControllerFactory = rpcControllerFactory;
190    }
191
192    this.tableName = builder.tableName;
193    this.operationTimeoutMs = builder.operationTimeout;
194    this.rpcTimeoutMs = builder.rpcTimeout;
195    this.readRpcTimeoutMs = builder.readRpcTimeout;
196    this.writeRpcTimeoutMs = builder.writeRpcTimeout;
197    this.scanReadRpcTimeout = builder.scanReadRpcTimeout;
198    this.scanTimeout = builder.scanTimeout;
199    this.scannerCaching = connConfiguration.getScannerCaching();
200    this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
201    this.requestAttributes = requestAttributes;
202
203    // puts need to track errors globally due to how the APIs currently work.
204    multiAp = this.connection.getAsyncProcess();
205    this.locator = new HRegionLocator(tableName, connection);
206  }
207
208  /** Returns maxKeyValueSize from configuration. */
209  public static int getMaxKeyValueSize(Configuration conf) {
210    return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1);
211  }
212
213  @Override
214  public Configuration getConfiguration() {
215    return configuration;
216  }
217
218  @Override
219  public TableName getName() {
220    return tableName;
221  }
222
223  /**
224   * <em>INTERNAL</em> Used by unit tests and tools to do low-level manipulations.
225   * @return A Connection instance.
226   */
227  protected Connection getConnection() {
228    return this.connection;
229  }
230
231  @Override
232  @Deprecated
233  public HTableDescriptor getTableDescriptor() throws IOException {
234    HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
235      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
236    if (htd != null) {
237      return new ImmutableHTableDescriptor(htd);
238    }
239    return null;
240  }
241
242  @Override
243  public TableDescriptor getDescriptor() throws IOException {
244    return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
245      rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
246  }
247
248  /**
249   * Get the corresponding start keys and regions for an arbitrary range of keys.
250   * <p>
251   * @param startKey      Starting row in range, inclusive
252   * @param endKey        Ending row in range
253   * @param includeEndKey true if endRow is inclusive, false if exclusive
254   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
255   *         range
256   * @throws IOException if a remote or network exception occurs
257   */
258  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
259    final byte[] endKey, final boolean includeEndKey) throws IOException {
260    return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
261  }
262
263  /**
264   * Get the corresponding start keys and regions for an arbitrary range of keys.
265   * <p>
266   * @param startKey      Starting row in range, inclusive
267   * @param endKey        Ending row in range
268   * @param includeEndKey true if endRow is inclusive, false if exclusive
269   * @param reload        true to reload information or false to use cached information
270   * @return A pair of list of start keys and list of HRegionLocations that contain the specified
271   *         range
272   * @throws IOException if a remote or network exception occurs
273   */
274  private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
275    final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
276    final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
277    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
278      throw new IllegalArgumentException(
279        "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
280    }
281    List<byte[]> keysInRange = new ArrayList<>();
282    List<HRegionLocation> regionsInRange = new ArrayList<>();
283    byte[] currentKey = startKey;
284    do {
285      HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload);
286      keysInRange.add(currentKey);
287      regionsInRange.add(regionLocation);
288      currentKey = regionLocation.getRegionInfo().getEndKey();
289    } while (
290      !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
291        && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
292          || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))
293    );
294    return new Pair<>(keysInRange, regionsInRange);
295  }
296
297  /**
298   * The underlying {@link HTable} must not be closed. {@link Table#getScanner(Scan)} has other
299   * usage details.
300   */
301  @Override
302  public ResultScanner getScanner(Scan scan) throws IOException {
303    // Clone to avoid modifying user object from scan internals.
304    // See https://issues.apache.org/jira/browse/HBASE-27402.
305    return getScannerInternal(new Scan(scan), scan);
306  }
307
308  private ResultScanner getScannerInternal(Scan scan, Scan scanForMetrics) throws IOException {
309    final Span span =
310      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build();
311    try (Scope ignored = span.makeCurrent()) {
312      if (scan.getCaching() <= 0) {
313        scan.setCaching(scannerCaching);
314      }
315      if (scan.getMaxResultSize() <= 0) {
316        scan.setMaxResultSize(scannerMaxResultSize);
317      }
318      if (scan.getMvccReadPoint() > 0) {
319        // it is not supposed to be set by user, clear
320        scan.resetMvccReadPoint();
321      }
322      final boolean async = scan.isAsyncPrefetch() != null
323        ? scan.isAsyncPrefetch()
324        : connConfiguration.isClientScannerAsyncPrefetch();
325      final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan();
326
327      if (scan.isReversed()) {
328        return new ReversedClientScanner(getConfiguration(), scan, scanForMetrics, getName(),
329          connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
330          replicaTimeout, connConfiguration, requestAttributes);
331      } else {
332        if (async) {
333          return new ClientAsyncPrefetchScanner(getConfiguration(), scan, scanForMetrics, getName(),
334            connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
335            scanTimeout, replicaTimeout, connConfiguration, requestAttributes);
336        } else {
337          return new ClientSimpleScanner(getConfiguration(), scan, scanForMetrics, getName(),
338            connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
339            scanTimeout, replicaTimeout, connConfiguration, requestAttributes);
340        }
341      }
342    }
343  }
344
345  /**
346   * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[])} has other
347   * usage details.
348   */
349  @Override
350  public ResultScanner getScanner(byte[] family) throws IOException {
351    Scan scan = new Scan();
352    scan.addFamily(family);
353    return getScannerInternal(scan, scan);
354  }
355
356  /**
357   * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[], byte[])} has
358   * other usage details.
359   */
360  @Override
361  public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
362    Scan scan = new Scan();
363    scan.addColumn(family, qualifier);
364    return getScannerInternal(scan, scan);
365  }
366
367  @Override
368  public Result get(final Get get) throws IOException {
369    final Supplier<Span> supplier =
370      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get);
371    return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier);
372  }
373
374  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
375    // if we are changing settings to the get, clone it.
376    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
377      get = ReflectionUtils.newInstance(get.getClass(), get);
378      get.setCheckExistenceOnly(checkExistenceOnly);
379      if (get.getConsistency() == null) {
380        get.setConsistency(DEFAULT_CONSISTENCY);
381      }
382    }
383
384    if (get.getConsistency() == Consistency.STRONG) {
385      final Get configuredGet = get;
386      ClientServiceCallable<Result> callable =
387        new ClientServiceCallable<Result>(this.connection, getName(), get.getRow(),
388          this.rpcControllerFactory.newController(), get.getPriority(), requestAttributes) {
389          @Override
390          protected Result rpcCall() throws Exception {
391            ClientProtos.GetRequest request = RequestConverter
392              .buildGetRequest(getLocation().getRegionInfo().getRegionName(), configuredGet);
393            ClientProtos.GetResponse response = doGet(request);
394            return response == null
395              ? null
396              : ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
397          }
398        };
399      return rpcCallerFactory.<Result> newCaller(readRpcTimeoutMs).callWithRetries(callable,
400        this.operationTimeoutMs);
401    }
402
403    // Call that takes into account the replica
404    RpcRetryingCallerWithReadReplicas callable =
405      new RpcRetryingCallerWithReadReplicas(rpcControllerFactory, tableName, this.connection, get,
406        pool, connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs,
407        connConfiguration.getPrimaryCallTimeoutMicroSecond(), requestAttributes);
408    return callable.call(operationTimeoutMs);
409  }
410
411  @Override
412  public Result[] get(List<Get> gets) throws IOException {
413    final Supplier<Span> supplier =
414      new TableOperationSpanBuilder(connection).setTableName(tableName)
415        .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets);
416    return TraceUtil.trace(() -> {
417      if (gets.size() == 1) {
418        return new Result[] { get(gets.get(0)) };
419      }
420      try {
421        Object[] r1 = new Object[gets.size()];
422        batch((List<? extends Row>) gets, r1, readRpcTimeoutMs);
423        // Translate.
424        Result[] results = new Result[r1.length];
425        int i = 0;
426        for (Object obj : r1) {
427          // Batch ensures if there is a failure we get an exception instead
428          results[i++] = (Result) obj;
429        }
430        return results;
431      } catch (InterruptedException e) {
432        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
433      }
434    }, supplier);
435  }
436
437  @Override
438  public void batch(final List<? extends Row> actions, final Object[] results)
439    throws InterruptedException, IOException {
440    int rpcTimeout = writeRpcTimeoutMs;
441    boolean hasRead = false;
442    boolean hasWrite = false;
443    for (Row action : actions) {
444      if (action instanceof Mutation) {
445        hasWrite = true;
446      } else {
447        hasRead = true;
448      }
449      if (hasRead && hasWrite) {
450        break;
451      }
452    }
453    if (hasRead && !hasWrite) {
454      rpcTimeout = readRpcTimeoutMs;
455    }
456    try {
457      batch(actions, results, rpcTimeout);
458    } catch (InterruptedException e) {
459      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
460    }
461  }
462
463  public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
464    throws InterruptedException, IOException {
465    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
466      .setRowAccess(actions).setResults(results).setRpcTimeout(rpcTimeout)
467      .setOperationTimeout(operationTimeoutMs).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
468      .setRequestAttributes(requestAttributes).build();
469    final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName)
470      .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions)
471      .build();
472    try (Scope ignored = span.makeCurrent()) {
473      AsyncRequestFuture ars = multiAp.submit(task);
474      ars.waitUntilDone();
475      if (ars.hasError()) {
476        TraceUtil.setError(span, ars.getErrors());
477        throw ars.getErrors();
478      }
479      span.setStatus(StatusCode.OK);
480    } finally {
481      span.end();
482    }
483  }
484
485  @Override
486  public <R> void batchCallback(final List<? extends Row> actions, final Object[] results,
487    final Batch.Callback<R> callback) throws IOException, InterruptedException {
488    doBatchWithCallback(actions, results, callback, connection, pool, tableName, requestAttributes);
489  }
490
491  public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
492    Batch.Callback<R> callback, ClusterConnection connection, ExecutorService pool,
493    TableName tableName, Map<String, byte[]> requestAttributes)
494    throws InterruptedIOException, RetriesExhaustedWithDetailsException {
495    int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
496    int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
497      connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
498        HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
499    AsyncProcessTask<R> task =
500      AsyncProcessTask.newBuilder(callback).setPool(pool).setTableName(tableName)
501        .setRowAccess(actions).setResults(results).setOperationTimeout(operationTimeout)
502        .setRpcTimeout(writeTimeout).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
503        .setRequestAttributes(requestAttributes).build();
504    final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName)
505      .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions)
506      .build();
507    try (Scope ignored = span.makeCurrent()) {
508      AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
509      ars.waitUntilDone();
510      if (ars.hasError()) {
511        TraceUtil.setError(span, ars.getErrors());
512        throw ars.getErrors();
513      }
514    } finally {
515      span.end();
516    }
517  }
518
519  @Override
520  public void delete(final Delete delete) throws IOException {
521    final Supplier<Span> supplier =
522      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(delete);
523    TraceUtil.trace(() -> {
524      ClientServiceCallable<Void> callable =
525        new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(),
526          this.rpcControllerFactory.newController(), delete.getPriority(), requestAttributes) {
527          @Override
528          protected Void rpcCall() throws Exception {
529            MutateRequest request = RequestConverter
530              .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete);
531            doMutate(request);
532            return null;
533          }
534        };
535      rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
536        this.operationTimeoutMs);
537    }, supplier);
538  }
539
540  @Override
541  public void delete(final List<Delete> deletes) throws IOException {
542    Object[] results = new Object[deletes.size()];
543    try {
544      batch(deletes, results, writeRpcTimeoutMs);
545    } catch (InterruptedException e) {
546      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
547    } finally {
548      // TODO: to be consistent with batch put(), do not modify input list
549      // mutate list so that it is empty for complete success, or contains only failed records
550      // results are returned in the same order as the requests in list walk the list backwards,
551      // so we can remove from list without impacting the indexes of earlier members
552      for (int i = results.length - 1; i >= 0; i--) {
553        // if result is not null, it succeeded
554        if (results[i] instanceof Result) {
555          deletes.remove(i);
556        }
557      }
558    }
559  }
560
561  @Override
562  public void put(final Put put) throws IOException {
563    final Supplier<Span> supplier =
564      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(put);
565    TraceUtil.trace(() -> {
566      validatePut(put);
567      ClientServiceCallable<Void> callable =
568        new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(),
569          this.rpcControllerFactory.newController(), put.getPriority(), requestAttributes) {
570          @Override
571          protected Void rpcCall() throws Exception {
572            MutateRequest request = RequestConverter
573              .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put);
574            doMutate(request);
575            return null;
576          }
577        };
578      rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
579        this.operationTimeoutMs);
580    }, supplier);
581  }
582
583  @Override
584  public void put(final List<Put> puts) throws IOException {
585    for (Put put : puts) {
586      validatePut(put);
587    }
588    Object[] results = new Object[puts.size()];
589    try {
590      batch(puts, results, writeRpcTimeoutMs);
591    } catch (InterruptedException e) {
592      throw (InterruptedIOException) new InterruptedIOException().initCause(e);
593    }
594  }
595
596  @Override
597  public Result mutateRow(final RowMutations rm) throws IOException {
598    final Supplier<Span> supplier =
599      new TableOperationSpanBuilder(connection).setTableName(tableName)
600        .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(rm);
601    return TraceUtil.trace(() -> {
602      long nonceGroup = getNonceGroup();
603      long nonce = getNonce();
604      CancellableRegionServerCallable<MultiResponse> callable =
605        new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
606          rpcControllerFactory.newController(), writeRpcTimeoutMs,
607          new RetryingTimeTracker().start(), rm.getMaxPriority(), requestAttributes) {
608          @Override
609          protected MultiResponse rpcCall() throws Exception {
610            MultiRequest request = RequestConverter.buildMultiRequest(
611              getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce);
612            ClientProtos.MultiResponse response = doMulti(request);
613            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
614            if (res.hasException()) {
615              Throwable ex = ProtobufUtil.toException(res.getException());
616              if (ex instanceof IOException) {
617                throw (IOException) ex;
618              }
619              throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()),
620                ex);
621            }
622            return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
623          }
624        };
625      Object[] results = new Object[rm.getMutations().size()];
626      AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
627        .setRowAccess(rm.getMutations()).setCallable(callable).setRpcTimeout(writeRpcTimeoutMs)
628        .setOperationTimeout(operationTimeoutMs)
629        .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results)
630        .setRequestAttributes(requestAttributes).build();
631      AsyncRequestFuture ars = multiAp.submit(task);
632      ars.waitUntilDone();
633      if (ars.hasError()) {
634        throw ars.getErrors();
635      }
636      return (Result) results[0];
637    }, supplier);
638  }
639
640  private long getNonceGroup() {
641    return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup();
642  }
643
644  private long getNonce() {
645    return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce();
646  }
647
648  @Override
649  public Result append(final Append append) throws IOException {
650    final Supplier<Span> supplier =
651      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(append);
652    return TraceUtil.trace(() -> {
653      checkHasFamilies(append);
654      NoncedRegionServerCallable<Result> callable =
655        new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
656          this.rpcControllerFactory.newController(), append.getPriority(), requestAttributes) {
657          @Override
658          protected Result rpcCall() throws Exception {
659            MutateRequest request =
660              RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
661                append, super.getNonceGroup(), super.getNonce());
662            MutateResponse response = doMutate(request);
663            if (!response.hasResult()) {
664              return null;
665            }
666            return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
667          }
668        };
669      return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
670        this.operationTimeoutMs);
671    }, supplier);
672  }
673
674  @Override
675  public Result increment(final Increment increment) throws IOException {
676    final Supplier<Span> supplier =
677      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(increment);
678    return TraceUtil.trace(() -> {
679      checkHasFamilies(increment);
680      NoncedRegionServerCallable<Result> callable =
681        new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
682          this.rpcControllerFactory.newController(), increment.getPriority(), requestAttributes) {
683          @Override
684          protected Result rpcCall() throws Exception {
685            MutateRequest request =
686              RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
687                increment, super.getNonceGroup(), super.getNonce());
688            MutateResponse response = doMutate(request);
689            // Should this check for null like append does?
690            return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
691          }
692        };
693      return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable,
694        this.operationTimeoutMs);
695    }, supplier);
696  }
697
698  @Override
699  public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
700    final long amount) throws IOException {
701    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
702  }
703
704  @Override
705  public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
706    final long amount, final Durability durability) throws IOException {
707    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
708      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.INCREMENT);
709    return TraceUtil.trace(() -> {
710      NullPointerException npe = null;
711      if (row == null) {
712        npe = new NullPointerException("row is null");
713      } else if (family == null) {
714        npe = new NullPointerException("family is null");
715      }
716      if (npe != null) {
717        throw new IOException("Invalid arguments to incrementColumnValue", npe);
718      }
719
720      NoncedRegionServerCallable<Long> callable =
721        new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
722          this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET, requestAttributes) {
723          @Override
724          protected Long rpcCall() throws Exception {
725            MutateRequest request = RequestConverter.buildIncrementRequest(
726              getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount,
727              durability, super.getNonceGroup(), super.getNonce());
728            MutateResponse response = doMutate(request);
729            Result result =
730              ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
731            return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
732          }
733        };
734      return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable,
735        this.operationTimeoutMs);
736    }, supplier);
737  }
738
739  @Override
740  @Deprecated
741  public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
742    final byte[] value, final Put put) throws IOException {
743    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
744      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
745      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
746        HBaseSemanticAttributes.Operation.PUT);
747    return TraceUtil.trace(
748      () -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
749        .isSuccess(),
750      supplier);
751  }
752
753  @Override
754  @Deprecated
755  public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
756    final CompareOp compareOp, final byte[] value, final Put put) throws IOException {
757    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
758      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
759      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
760        HBaseSemanticAttributes.Operation.PUT);
761    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
762      toCompareOperator(compareOp), value, null, null, put).isSuccess(), supplier);
763  }
764
765  @Override
766  @Deprecated
767  public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
768    final CompareOperator op, final byte[] value, final Put put) throws IOException {
769    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
770      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
771      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
772        HBaseSemanticAttributes.Operation.PUT);
773    return TraceUtil.trace(
774      () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
775      supplier);
776  }
777
778  @Override
779  @Deprecated
780  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
781    final byte[] value, final Delete delete) throws IOException {
782    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
783      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
784      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
785        HBaseSemanticAttributes.Operation.DELETE);
786    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL,
787      value, null, null, delete).isSuccess(), supplier);
788  }
789
790  @Override
791  @Deprecated
792  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
793    final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
794    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
795      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
796      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
797        HBaseSemanticAttributes.Operation.DELETE);
798    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
799      toCompareOperator(compareOp), value, null, null, delete).isSuccess(), supplier);
800  }
801
802  @Override
803  @Deprecated
804  public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
805    final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
806    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
807      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
808      .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE,
809        HBaseSemanticAttributes.Operation.DELETE);
810    return TraceUtil.trace(
811      () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
812      supplier);
813  }
814
815  @Override
816  @Deprecated
817  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
818    return new CheckAndMutateBuilderImpl(row, family);
819  }
820
821  @Override
822  @Deprecated
823  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
824    return new CheckAndMutateWithFilterBuilderImpl(row, filter);
825  }
826
827  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
828    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
829    final TimeRange timeRange, final RowMutations rm) throws IOException {
830    long nonceGroup = getNonceGroup();
831    long nonce = getNonce();
832    CancellableRegionServerCallable<MultiResponse> callable =
833      new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
834        rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
835        rm.getMaxPriority(), requestAttributes) {
836        @Override
837        protected MultiResponse rpcCall() throws Exception {
838          MultiRequest request =
839            RequestConverter.buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row,
840              family, qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce);
841          ClientProtos.MultiResponse response = doMulti(request);
842          ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
843          if (res.hasException()) {
844            Throwable ex = ProtobufUtil.toException(res.getException());
845            if (ex instanceof IOException) {
846              throw (IOException) ex;
847            }
848            throw new IOException(
849              "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
850          }
851          return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
852        }
853      };
854
855    /**
856     * Currently, we use one array to store 'processed' flag which is returned by server. It is
857     * excessive to send such a large array, but that is required by the framework right now
858     */
859    Object[] results = new Object[rm.getMutations().size()];
860    AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
861      .setRowAccess(rm.getMutations()).setResults(results).setCallable(callable)
862      // TODO any better timeout?
863      .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
864      .setOperationTimeout(operationTimeoutMs).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
865      .setRequestAttributes(requestAttributes).build();
866    AsyncRequestFuture ars = multiAp.submit(task);
867    ars.waitUntilDone();
868    if (ars.hasError()) {
869      throw ars.getErrors();
870    }
871
872    return (CheckAndMutateResult) results[0];
873  }
874
875  @Override
876  @Deprecated
877  public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
878    final CompareOp compareOp, final byte[] value, final RowMutations rm) throws IOException {
879    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
880      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
881      .setContainerOperations(rm);
882    return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier,
883      toCompareOperator(compareOp), value, null, null, rm).isSuccess(), supplier);
884  }
885
886  @Override
887  @Deprecated
888  public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
889    final CompareOperator op, final byte[] value, final RowMutations rm) throws IOException {
890    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
891      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
892      .setContainerOperations(rm);
893    return TraceUtil.trace(
894      () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
895      supplier);
896  }
897
898  @Override
899  public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
900    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
901      .setTableName(tableName).setOperation(checkAndMutate).setContainerOperations(checkAndMutate);
902    return TraceUtil.trace(() -> {
903      Row action = checkAndMutate.getAction();
904      if (
905        action instanceof Put || action instanceof Delete || action instanceof Increment
906          || action instanceof Append
907      ) {
908        if (action instanceof Put) {
909          validatePut((Put) action);
910        }
911        return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
912          checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
913          checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
914      } else {
915        return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
916          checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
917          checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
918      }
919    }, supplier);
920  }
921
922  private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
923    final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
924    final TimeRange timeRange, final Mutation mutation) throws IOException {
925    long nonceGroup = getNonceGroup();
926    long nonce = getNonce();
927    ClientServiceCallable<CheckAndMutateResult> callable =
928      new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
929        this.rpcControllerFactory.newController(), mutation.getPriority(), requestAttributes) {
930        @Override
931        protected CheckAndMutateResult rpcCall() throws Exception {
932          MutateRequest request =
933            RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row,
934              family, qualifier, op, value, filter, timeRange, mutation, nonceGroup, nonce);
935          MutateResponse response = doMutate(request);
936          if (response.hasResult()) {
937            return new CheckAndMutateResult(response.getProcessed(),
938              ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()));
939          }
940          return new CheckAndMutateResult(response.getProcessed(), null);
941        }
942      };
943    return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
944      .callWithRetries(callable, this.operationTimeoutMs);
945  }
946
947  @Override
948  public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
949    throws IOException {
950    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
951      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.BATCH)
952      .setContainerOperations(checkAndMutates);
953    return TraceUtil.trace(() -> {
954      if (checkAndMutates.isEmpty()) {
955        return Collections.emptyList();
956      }
957      if (checkAndMutates.size() == 1) {
958        return Collections.singletonList(checkAndMutate(checkAndMutates.get(0)));
959      }
960
961      Object[] results = new Object[checkAndMutates.size()];
962      try {
963        batch(checkAndMutates, results, writeRpcTimeoutMs);
964      } catch (InterruptedException e) {
965        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
966      }
967
968      // translate.
969      List<CheckAndMutateResult> ret = new ArrayList<>(results.length);
970      for (Object r : results) {
971        // Batch ensures if there is a failure we get an exception instead
972        ret.add((CheckAndMutateResult) r);
973      }
974      return ret;
975    }, supplier);
976  }
977
978  private CompareOperator toCompareOperator(CompareOp compareOp) {
979    switch (compareOp) {
980      case LESS:
981        return CompareOperator.LESS;
982
983      case LESS_OR_EQUAL:
984        return CompareOperator.LESS_OR_EQUAL;
985
986      case EQUAL:
987        return CompareOperator.EQUAL;
988
989      case NOT_EQUAL:
990        return CompareOperator.NOT_EQUAL;
991
992      case GREATER_OR_EQUAL:
993        return CompareOperator.GREATER_OR_EQUAL;
994
995      case GREATER:
996        return CompareOperator.GREATER;
997
998      case NO_OP:
999        return CompareOperator.NO_OP;
1000
1001      default:
1002        throw new AssertionError();
1003    }
1004  }
1005
1006  @Override
1007  public boolean exists(final Get get) throws IOException {
1008    final Supplier<Span> supplier =
1009      new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get);
1010    return TraceUtil.trace(() -> {
1011      Result r = get(get, true);
1012      assert r.getExists() != null;
1013      return r.getExists();
1014    }, supplier);
1015  }
1016
1017  @Override
1018  public boolean[] exists(List<Get> gets) throws IOException {
1019    final Supplier<Span> supplier =
1020      new TableOperationSpanBuilder(connection).setTableName(tableName)
1021        .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets);
1022    return TraceUtil.trace(() -> {
1023      if (gets.isEmpty()) {
1024        return new boolean[] {};
1025      }
1026      if (gets.size() == 1) {
1027        return new boolean[] { exists(gets.get(0)) };
1028      }
1029
1030      ArrayList<Get> exists = new ArrayList<>(gets.size());
1031      for (Get g : gets) {
1032        Get ge = new Get(g);
1033        ge.setCheckExistenceOnly(true);
1034        exists.add(ge);
1035      }
1036
1037      Object[] r1 = new Object[exists.size()];
1038      try {
1039        batch(exists, r1, readRpcTimeoutMs);
1040      } catch (InterruptedException e) {
1041        throw (InterruptedIOException) new InterruptedIOException().initCause(e);
1042      }
1043
1044      // translate.
1045      boolean[] results = new boolean[r1.length];
1046      int i = 0;
1047      for (Object o : r1) {
1048        // batch ensures if there is a failure we get an exception instead
1049        results[i++] = ((Result) o).getExists();
1050      }
1051
1052      return results;
1053    }, supplier);
1054  }
1055
1056  /**
1057   * Process a mixed batch of Get, Put and Delete actions. All actions for a RegionServer are
1058   * forwarded in one RPC call. Queries are executed in parallel.
1059   * @param list    The collection of actions.
1060   * @param results An empty array, same size as list. If an exception is thrown, you can test here
1061   *                for partial results, and to determine which actions processed successfully.
1062   * @throws IOException if there are problems talking to META. Per-item exceptions are stored in
1063   *                     the results array.
1064   */
1065  public <R> void processBatchCallback(final List<? extends Row> list, final Object[] results,
1066    final Batch.Callback<R> callback) throws IOException, InterruptedException {
1067    this.batchCallback(list, results, callback);
1068  }
1069
1070  @Override
1071  public void close() throws IOException {
1072    final Supplier<Span> supplier =
1073      new TableSpanBuilder(connection).setName("HTable.close").setTableName(tableName);
1074    TraceUtil.trace(() -> {
1075      if (this.closed) {
1076        return;
1077      }
1078      if (cleanupPoolOnClose) {
1079        this.pool.shutdown();
1080        try {
1081          boolean terminated = false;
1082          do {
1083            // wait until the pool has terminated
1084            terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
1085          } while (!terminated);
1086        } catch (InterruptedException e) {
1087          this.pool.shutdownNow();
1088          LOG.warn("waitForTermination interrupted");
1089        }
1090      }
1091      this.closed = true;
1092    }, supplier);
1093  }
1094
1095  // validate for well-formedness
1096  private void validatePut(final Put put) throws IllegalArgumentException {
1097    ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize());
1098  }
1099
1100  /**
1101   * The pool is used for mutli requests for this HTable
1102   * @return the pool used for mutli
1103   */
1104  ExecutorService getPool() {
1105    return this.pool;
1106  }
1107
1108  /**
1109   * Explicitly clears the region cache to fetch the latest value from META. This is a power user
1110   * function: avoid unless you know the ramifications.
1111   */
1112  public void clearRegionCache() {
1113    this.connection.clearRegionLocationCache();
1114  }
1115
1116  @Override
1117  public CoprocessorRpcChannel coprocessorService(byte[] row) {
1118    return new RegionCoprocessorRpcChannel(connection, tableName, row, requestAttributes);
1119  }
1120
1121  @Override
1122  public <T extends Service, R> Map<byte[], R> coprocessorService(final Class<T> service,
1123    byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable)
1124    throws ServiceException, Throwable {
1125    final Map<byte[], R> results =
1126      Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR));
1127    coprocessorService(service, startKey, endKey, callable, (region, row, value) -> {
1128      if (region != null) {
1129        results.put(region, value);
1130      }
1131    });
1132    return results;
1133  }
1134
1135  @Override
1136  public <T extends Service, R> void coprocessorService(final Class<T> service, byte[] startKey,
1137    byte[] endKey, final Batch.Call<T, R> callable, final Batch.Callback<R> callback)
1138    throws ServiceException, Throwable {
1139    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1140      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
1141    TraceUtil.trace(() -> {
1142      final Context context = Context.current();
1143      final ExecutorService wrappedPool = context.wrap(pool);
1144      // get regions covered by the row range
1145      List<byte[]> keys = getStartKeysInRange(startKey, endKey);
1146      Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
1147      for (final byte[] r : keys) {
1148        final RegionCoprocessorRpcChannel channel =
1149          new RegionCoprocessorRpcChannel(connection, tableName, r, requestAttributes);
1150        Future<R> future = wrappedPool.submit(() -> {
1151          T instance =
1152            org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
1153          R result = callable.call(instance);
1154          byte[] region = channel.getLastRegion();
1155          if (callback != null) {
1156            callback.update(region, r, result);
1157          }
1158          return result;
1159        });
1160        futures.put(r, future);
1161      }
1162      for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
1163        try {
1164          e.getValue().get();
1165        } catch (ExecutionException ee) {
1166          LOG.warn("Error calling coprocessor service {} for row {}", service.getName(),
1167            Bytes.toStringBinary(e.getKey()), ee);
1168          throw ee.getCause();
1169        } catch (InterruptedException ie) {
1170          throw new InterruptedIOException("Interrupted calling coprocessor service "
1171            + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
1172        }
1173      }
1174    }, supplier);
1175  }
1176
1177  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException {
1178    if (start == null) {
1179      start = HConstants.EMPTY_START_ROW;
1180    }
1181    if (end == null) {
1182      end = HConstants.EMPTY_END_ROW;
1183    }
1184    return getKeysAndRegionsInRange(start, end, true).getFirst();
1185  }
1186
1187  @Override
1188  public long getRpcTimeout(TimeUnit unit) {
1189    return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
1190  }
1191
1192  @Override
1193  @Deprecated
1194  public int getRpcTimeout() {
1195    return rpcTimeoutMs;
1196  }
1197
1198  @Override
1199  @Deprecated
1200  public void setRpcTimeout(int rpcTimeout) {
1201    setReadRpcTimeout(rpcTimeout);
1202    setWriteRpcTimeout(rpcTimeout);
1203  }
1204
1205  @Override
1206  public long getReadRpcTimeout(TimeUnit unit) {
1207    return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
1208  }
1209
1210  @Override
1211  @Deprecated
1212  public int getReadRpcTimeout() {
1213    return readRpcTimeoutMs;
1214  }
1215
1216  @Override
1217  @Deprecated
1218  public void setReadRpcTimeout(int readRpcTimeout) {
1219    this.readRpcTimeoutMs = readRpcTimeout;
1220  }
1221
1222  @Override
1223  public long getWriteRpcTimeout(TimeUnit unit) {
1224    return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
1225  }
1226
1227  @Override
1228  @Deprecated
1229  public int getWriteRpcTimeout() {
1230    return writeRpcTimeoutMs;
1231  }
1232
1233  @Override
1234  @Deprecated
1235  public void setWriteRpcTimeout(int writeRpcTimeout) {
1236    this.writeRpcTimeoutMs = writeRpcTimeout;
1237  }
1238
1239  @Override
1240  public long getOperationTimeout(TimeUnit unit) {
1241    return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
1242  }
1243
1244  @Override
1245  @Deprecated
1246  public int getOperationTimeout() {
1247    return operationTimeoutMs;
1248  }
1249
1250  @Override
1251  @Deprecated
1252  public void setOperationTimeout(int operationTimeout) {
1253    this.operationTimeoutMs = operationTimeout;
1254  }
1255
1256  @Override
1257  public String toString() {
1258    return tableName + ";" + connection;
1259  }
1260
1261  @Override
1262  public <R extends Message> Map<byte[], R> batchCoprocessorService(
1263    Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
1264    R responsePrototype) throws ServiceException, Throwable {
1265    final Map<byte[], R> results =
1266      Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR));
1267    batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
1268      (region, row, result) -> {
1269        if (region != null) {
1270          results.put(region, result);
1271        }
1272      });
1273    return results;
1274  }
1275
1276  @Override
1277  public <R extends Message> void batchCoprocessorService(
1278    final Descriptors.MethodDescriptor methodDescriptor, final Message request, byte[] startKey,
1279    byte[] endKey, final R responsePrototype, final Batch.Callback<R> callback)
1280    throws ServiceException, Throwable {
1281    final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1282      .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
1283    TraceUtil.trace(() -> {
1284      final Context context = Context.current();
1285      final byte[] sanitizedStartKey =
1286        Optional.ofNullable(startKey).orElse(HConstants.EMPTY_START_ROW);
1287      final byte[] sanitizedEndKey = Optional.ofNullable(endKey).orElse(HConstants.EMPTY_END_ROW);
1288
1289      // get regions covered by the row range
1290      Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
1291        getKeysAndRegionsInRange(sanitizedStartKey, sanitizedEndKey, true);
1292      List<byte[]> keys = keysAndRegions.getFirst();
1293      List<HRegionLocation> regions = keysAndRegions.getSecond();
1294
1295      // check if we have any calls to make
1296      if (keys.isEmpty()) {
1297        LOG.info("No regions were selected by key range start={}, end={}",
1298          Bytes.toStringBinary(sanitizedStartKey), Bytes.toStringBinary(sanitizedEndKey));
1299        return;
1300      }
1301
1302      List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
1303      final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
1304        new TreeMap<>(Bytes.BYTES_COMPARATOR);
1305      for (int i = 0; i < keys.size(); i++) {
1306        final byte[] rowKey = keys.get(i);
1307        final byte[] region = regions.get(i).getRegionInfo().getRegionName();
1308        RegionCoprocessorServiceExec exec =
1309          new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
1310        execs.add(exec);
1311        execsByRow.put(rowKey, exec);
1312      }
1313
1314      // tracking for any possible deserialization errors on success callback
1315      // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
1316      final List<Throwable> callbackErrorExceptions = new ArrayList<>();
1317      final List<Row> callbackErrorActions = new ArrayList<>();
1318      final List<String> callbackErrorServers = new ArrayList<>();
1319      Object[] results = new Object[execs.size()];
1320
1321      AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
1322        RpcRetryingCallerFactory.instantiate(configuration, connConfiguration,
1323          connection.getStatisticsTracker(), connection.getConnectionMetrics()),
1324        RpcControllerFactory.instantiate(configuration));
1325
1326      Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback =
1327        (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
1328          if (LOG.isTraceEnabled()) {
1329            LOG.trace("Received result for endpoint {}: region={}, row={}, value={}",
1330              methodDescriptor.getFullName(), Bytes.toStringBinary(region),
1331              Bytes.toStringBinary(row), serviceResult.getValue().getValue());
1332          }
1333          try {
1334            Message.Builder builder = responsePrototype.newBuilderForType();
1335            org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
1336              serviceResult.getValue().getValue().toByteArray());
1337            callback.update(region, row, (R) builder.build());
1338          } catch (IOException e) {
1339            LOG.error("Unexpected response type from endpoint {}", methodDescriptor.getFullName(),
1340              e);
1341            callbackErrorExceptions.add(e);
1342            callbackErrorActions.add(execsByRow.get(row));
1343            callbackErrorServers.add("null");
1344          }
1345        };
1346      AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
1347        AsyncProcessTask.newBuilder(resultsCallback).setPool(context.wrap(pool))
1348          .setTableName(tableName).setRowAccess(execs).setResults(results)
1349          .setRpcTimeout(readRpcTimeoutMs).setOperationTimeout(operationTimeoutMs)
1350          .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
1351          .setRequestAttributes(requestAttributes).build();
1352      AsyncRequestFuture future = asyncProcess.submit(task);
1353      future.waitUntilDone();
1354
1355      if (future.hasError()) {
1356        throw future.getErrors();
1357      } else if (!callbackErrorExceptions.isEmpty()) {
1358        throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions,
1359          callbackErrorActions, callbackErrorServers);
1360      }
1361    }, supplier);
1362  }
1363
1364  @Override
1365  public RegionLocator getRegionLocator() {
1366    return this.locator;
1367  }
1368
1369  private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
1370
1371    private final byte[] row;
1372    private final byte[] family;
1373    private byte[] qualifier;
1374    private TimeRange timeRange;
1375    private CompareOperator op;
1376    private byte[] value;
1377
1378    CheckAndMutateBuilderImpl(byte[] row, byte[] family) {
1379      this.row = Preconditions.checkNotNull(row, "row is null");
1380      this.family = Preconditions.checkNotNull(family, "family is null");
1381    }
1382
1383    @Override
1384    public CheckAndMutateBuilder qualifier(byte[] qualifier) {
1385      this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using"
1386        + " an empty byte array, or just do not call this method if you want a null qualifier");
1387      return this;
1388    }
1389
1390    @Override
1391    public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
1392      this.timeRange = timeRange;
1393      return this;
1394    }
1395
1396    @Override
1397    public CheckAndMutateBuilder ifNotExists() {
1398      this.op = CompareOperator.EQUAL;
1399      this.value = null;
1400      return this;
1401    }
1402
1403    @Override
1404    public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
1405      this.op = Preconditions.checkNotNull(compareOp, "compareOp is null");
1406      this.value = Preconditions.checkNotNull(value, "value is null");
1407      return this;
1408    }
1409
1410    private void preCheck() {
1411      Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by"
1412        + " calling ifNotExists/ifEquals/ifMatches before executing the request");
1413    }
1414
1415    @Override
1416    public boolean thenPut(Put put) throws IOException {
1417      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1418        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1419      return TraceUtil.trace(() -> {
1420        validatePut(put);
1421        preCheck();
1422        return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
1423          .isSuccess();
1424      }, supplier);
1425    }
1426
1427    @Override
1428    public boolean thenDelete(Delete delete) throws IOException {
1429      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1430        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1431      return TraceUtil.trace(() -> {
1432        preCheck();
1433        return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
1434          .isSuccess();
1435      }, supplier);
1436    }
1437
1438    @Override
1439    public boolean thenMutate(RowMutations mutation) throws IOException {
1440      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1441        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1442      return TraceUtil.trace(() -> {
1443        preCheck();
1444        return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation)
1445          .isSuccess();
1446      }, supplier);
1447    }
1448  }
1449
1450  @Override
1451  public Map<String, byte[]> getRequestAttributes() {
1452    return requestAttributes;
1453  }
1454
1455  private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder {
1456
1457    private final byte[] row;
1458    private final Filter filter;
1459    private TimeRange timeRange;
1460
1461    CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
1462      this.row = Preconditions.checkNotNull(row, "row is null");
1463      this.filter = Preconditions.checkNotNull(filter, "filter is null");
1464    }
1465
1466    @Override
1467    public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
1468      this.timeRange = timeRange;
1469      return this;
1470    }
1471
1472    @Override
1473    public boolean thenPut(Put put) throws IOException {
1474      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1475        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1476      return TraceUtil.trace(() -> {
1477        validatePut(put);
1478        return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
1479      }, supplier);
1480    }
1481
1482    @Override
1483    public boolean thenDelete(Delete delete) throws IOException {
1484      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1485        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1486      return TraceUtil.trace(
1487        () -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(),
1488        supplier);
1489    }
1490
1491    @Override
1492    public boolean thenMutate(RowMutations mutation) throws IOException {
1493      final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
1494        .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
1495      return TraceUtil
1496        .trace(() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation)
1497          .isSuccess(), supplier);
1498    }
1499  }
1500}