001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
022import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
023
024import java.time.Duration;
025import java.util.Collections;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.TimeUnit;
030import java.util.function.Function;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CompareOperator;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.filter.Filter;
035import org.apache.hadoop.hbase.io.TimeRange;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
040import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
041
042/**
043 * The interface for asynchronous version of Table. Obtain an instance from a
044 * {@link AsyncConnection}.
045 * <p>
046 * The implementation is required to be thread safe.
047 * <p>
048 * Usually the implementation will not throw any exception directly. You need to get the exception
049 * from the returned {@link CompletableFuture}.
050 * @since 2.0.0
051 */
052@InterfaceAudience.Public
053public interface AsyncTable<C extends ScanResultConsumerBase> {
054
055  /**
056   * Gets the fully qualified table name instance of this table.
057   */
058  TableName getName();
059
060  /**
061   * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
062   * <p>
063   * The reference returned is not a copy, so any change made to it will affect this instance.
064   */
065  Configuration getConfiguration();
066
067  /**
068   * Gets the {@link TableDescriptor} for this table.
069   */
070  CompletableFuture<TableDescriptor> getDescriptor();
071
072  /**
073   * Gets the {@link AsyncTableRegionLocator} for this table.
074   */
075  AsyncTableRegionLocator getRegionLocator();
076
077  /**
078   * Get timeout of each rpc request in this Table instance. It will be overridden by a more
079   * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
080   * @see #getReadRpcTimeout(TimeUnit)
081   * @see #getWriteRpcTimeout(TimeUnit)
082   * @param unit the unit of time the timeout to be represented in
083   * @return rpc timeout in the specified time unit
084   */
085  long getRpcTimeout(TimeUnit unit);
086
087  /**
088   * Get timeout of each rpc read request in this Table instance.
089   * @param unit the unit of time the timeout to be represented in
090   * @return read rpc timeout in the specified time unit
091   */
092  long getReadRpcTimeout(TimeUnit unit);
093
094  /**
095   * Get timeout of each rpc write request in this Table instance.
096   * @param unit the unit of time the timeout to be represented in
097   * @return write rpc timeout in the specified time unit
098   */
099  long getWriteRpcTimeout(TimeUnit unit);
100
101  /**
102   * Get timeout of each operation in Table instance.
103   * @param unit the unit of time the timeout to be represented in
104   * @return operation rpc timeout in the specified time unit
105   */
106  long getOperationTimeout(TimeUnit unit);
107
108  /**
109   * Get the timeout of a single operation in a scan. It works like operation timeout for other
110   * operations.
111   * @param unit the unit of time the timeout to be represented in
112   * @return scan rpc timeout in the specified time unit
113   */
114  long getScanTimeout(TimeUnit unit);
115
116  /**
117   * Get the map of request attributes
118   * @return a map of request attributes supplied by the client
119   */
120  default Map<String, byte[]> getRequestAttributes() {
121    return Collections.emptyMap();
122  }
123
124  /**
125   * Test for the existence of columns in the table, as specified by the Get.
126   * <p>
127   * This will return true if the Get matches one or more keys, false if not.
128   * <p>
129   * This is a server-side call so it prevents any data from being transfered to the client.
130   * @return true if the specified Get matches one or more keys, false if not. The return value will
131   *         be wrapped by a {@link CompletableFuture}.
132   */
133  default CompletableFuture<Boolean> exists(Get get) {
134    return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
135  }
136
137  /**
138   * Extracts certain cells from a given row.
139   * @param get The object that specifies what data to fetch and from which row.
140   * @return The data coming from the specified row, if it exists. If the row specified doesn't
141   *         exist, the {@link Result} instance returned won't contain any
142   *         {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
143   *         return value will be wrapped by a {@link CompletableFuture}.
144   */
145  CompletableFuture<Result> get(Get get);
146
147  /**
148   * Puts some data to the table.
149   * @param put The data to put.
150   * @return A {@link CompletableFuture} that always returns null when complete normally.
151   */
152  CompletableFuture<Void> put(Put put);
153
154  /**
155   * Deletes the specified cells/row.
156   * @param delete The object that specifies what to delete.
157   * @return A {@link CompletableFuture} that always returns null when complete normally.
158   */
159  CompletableFuture<Void> delete(Delete delete);
160
161  /**
162   * Appends values to one or more columns within a single row.
163   * <p>
164   * This operation does not appear atomic to readers. Appends are done under a single row lock, so
165   * write operations to a row are synchronized, but readers do not take row locks so get and scan
166   * operations can see this operation partially completed.
167   * @param append object that specifies the columns and amounts to be used for the increment
168   *               operations
169   * @return values of columns after the append operation (maybe null). The return value will be
170   *         wrapped by a {@link CompletableFuture}.
171   */
172  CompletableFuture<Result> append(Append append);
173
174  /**
175   * Increments one or more columns within a single row.
176   * <p>
177   * This operation does not appear atomic to readers. Increments are done under a single row lock,
178   * so write operations to a row are synchronized, but readers do not take row locks so get and
179   * scan operations can see this operation partially completed.
180   * @param increment object that specifies the columns and amounts to be used for the increment
181   *                  operations
182   * @return values of columns after the increment. The return value will be wrapped by a
183   *         {@link CompletableFuture}.
184   */
185  CompletableFuture<Result> increment(Increment increment);
186
187  /**
188   * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
189   * <p>
190   * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
191   * @param row       The row that contains the cell to increment.
192   * @param family    The column family of the cell to increment.
193   * @param qualifier The column qualifier of the cell to increment.
194   * @param amount    The amount to increment the cell with (or decrement, if the amount is
195   *                  negative).
196   * @return The new value, post increment. The return value will be wrapped by a
197   *         {@link CompletableFuture}.
198   */
199  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
200    long amount) {
201    return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
202  }
203
204  /**
205   * Atomically increments a column value. If the column value already exists and is not a
206   * big-endian long, this could throw an exception. If the column value does not yet exist it is
207   * initialized to <code>amount</code> and written to the specified column.
208   * <p>
209   * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
210   * any increments that have not been flushed.
211   * @param row        The row that contains the cell to increment.
212   * @param family     The column family of the cell to increment.
213   * @param qualifier  The column qualifier of the cell to increment.
214   * @param amount     The amount to increment the cell with (or decrement, if the amount is
215   *                   negative).
216   * @param durability The persistence guarantee for this increment.
217   * @return The new value, post increment. The return value will be wrapped by a
218   *         {@link CompletableFuture}.
219   */
220  default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
221    long amount, Durability durability) {
222    Preconditions.checkNotNull(row, "row is null");
223    Preconditions.checkNotNull(family, "family is null");
224    return increment(
225      new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
226        .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
227  }
228
229  /**
230   * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
231   * adds the Put/Delete/RowMutations.
232   * <p>
233   * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
234   * This is a fluent style API, the code is like:
235   *
236   * <pre>
237   * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
238   *   .thenAccept(succ -&gt; {
239   *     if (succ) {
240   *       System.out.println("Check and put succeeded");
241   *     } else {
242   *       System.out.println("Check and put failed");
243   *     }
244   *   });
245   * </pre>
246   *
247   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
248   *             any more.
249   */
250  @Deprecated
251  CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
252
253  /**
254   * A helper class for sending checkAndMutate request.
255   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
256   *             any more.
257   */
258  @Deprecated
259  interface CheckAndMutateBuilder {
260
261    /**
262     * Match a qualifier.
263     * @param qualifier column qualifier to check.
264     */
265    CheckAndMutateBuilder qualifier(byte[] qualifier);
266
267    /**
268     * Match a timerange.
269     * @param timeRange time range to check.
270     */
271    CheckAndMutateBuilder timeRange(TimeRange timeRange);
272
273    /**
274     * Check for lack of column.
275     */
276    CheckAndMutateBuilder ifNotExists();
277
278    /**
279     * Check for equality.
280     * @param value the expected value
281     */
282    default CheckAndMutateBuilder ifEquals(byte[] value) {
283      return ifMatches(CompareOperator.EQUAL, value);
284    }
285
286    /**
287     * Compare a value
288     * @param compareOp comparison operator to use
289     * @param value     the expected value
290     */
291    CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
292
293    /**
294     * Specify a Put to commit if the check succeeds.
295     * @param put data to put if check succeeds
296     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
297     *         will be wrapped by a {@link CompletableFuture}.
298     */
299    CompletableFuture<Boolean> thenPut(Put put);
300
301    /**
302     * Specify a Delete to commit if the check succeeds.
303     * @param delete data to delete if check succeeds
304     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
305     *         value will be wrapped by a {@link CompletableFuture}.
306     */
307    CompletableFuture<Boolean> thenDelete(Delete delete);
308
309    /**
310     * Specify a RowMutations to commit if the check succeeds.
311     * @param mutation mutations to perform if check succeeds
312     * @return true if the new mutation was executed, false otherwise. The return value will be
313     *         wrapped by a {@link CompletableFuture}.
314     */
315    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
316  }
317
318  /**
319   * Atomically checks if a row matches the specified filter. If it does, it adds the
320   * Put/Delete/RowMutations.
321   * <p>
322   * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
323   * execute it. This is a fluent style API, the code is like:
324   *
325   * <pre>
326   * table.checkAndMutate(row, filter).thenPut(put).thenAccept(succ -&gt; {
327   *   if (succ) {
328   *     System.out.println("Check and put succeeded");
329   *   } else {
330   *     System.out.println("Check and put failed");
331   *   }
332   * });
333   * </pre>
334   *
335   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
336   *             any more.
337   */
338  @Deprecated
339  CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);
340
341  /**
342   * A helper class for sending checkAndMutate request with a filter.
343   * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it
344   *             any more.
345   */
346  @Deprecated
347  interface CheckAndMutateWithFilterBuilder {
348
349    /**
350     * Match a timerange.
351     * @param timeRange time range to check.
352     */
353    CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);
354
355    /**
356     * Specify a Put to commit if the check succeeds.
357     * @param put data to put if check succeeds
358     * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
359     *         will be wrapped by a {@link CompletableFuture}.
360     */
361    CompletableFuture<Boolean> thenPut(Put put);
362
363    /**
364     * Specify a Delete to commit if the check succeeds.
365     * @param delete data to delete if check succeeds
366     * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
367     *         value will be wrapped by a {@link CompletableFuture}.
368     */
369    CompletableFuture<Boolean> thenDelete(Delete delete);
370
371    /**
372     * Specify a RowMutations to commit if the check succeeds.
373     * @param mutation mutations to perform if check succeeds
374     * @return true if the new mutation was executed, false otherwise. The return value will be
375     *         wrapped by a {@link CompletableFuture}.
376     */
377    CompletableFuture<Boolean> thenMutate(RowMutations mutation);
378  }
379
380  /**
381   * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it
382   * performs the specified action.
383   * @param checkAndMutate The CheckAndMutate object.
384   * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate.
385   */
386  CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate);
387
388  /**
389   * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense
390   * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed
391   * atomically (and thus, each may fail independently of others).
392   * @param checkAndMutates The list of CheckAndMutate.
393   * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate.
394   */
395  List<CompletableFuture<CheckAndMutateResult>>
396    checkAndMutate(List<CheckAndMutate> checkAndMutates);
397
398  /**
399   * A simple version of batch checkAndMutate. It will fail if there are any failures.
400   * @param checkAndMutates The list of rows to apply.
401   * @return A {@link CompletableFuture} that wrapper the result list.
402   */
403  default CompletableFuture<List<CheckAndMutateResult>>
404    checkAndMutateAll(List<CheckAndMutate> checkAndMutates) {
405    return allOf(checkAndMutate(checkAndMutates));
406  }
407
408  /**
409   * Performs multiple mutations atomically on a single row. Currently {@link Put} and
410   * {@link Delete} are supported.
411   * @param mutation object that specifies the set of mutations to perform atomically
412   * @return A {@link CompletableFuture} that returns results of Increment/Append operations
413   */
414  CompletableFuture<Result> mutateRow(RowMutations mutation);
415
416  /**
417   * The scan API uses the observer pattern.
418   * @param scan     A configured {@link Scan} object.
419   * @param consumer the consumer used to receive results.
420   * @see ScanResultConsumer
421   * @see AdvancedScanResultConsumer
422   */
423  void scan(Scan scan, C consumer);
424
425  /**
426   * Gets a scanner on the current table for the given family.
427   * @param family The column family to scan.
428   * @return A scanner.
429   */
430  default ResultScanner getScanner(byte[] family) {
431    return getScanner(new Scan().addFamily(family));
432  }
433
434  /**
435   * Gets a scanner on the current table for the given family and qualifier.
436   * @param family    The column family to scan.
437   * @param qualifier The column qualifier to scan.
438   * @return A scanner.
439   */
440  default ResultScanner getScanner(byte[] family, byte[] qualifier) {
441    return getScanner(new Scan().addColumn(family, qualifier));
442  }
443
444  /**
445   * Returns a scanner on the current table as specified by the {@link Scan} object.
446   * @param scan A configured {@link Scan} object.
447   * @return A scanner.
448   */
449  ResultScanner getScanner(Scan scan);
450
451  /**
452   * Return all the results that match the given scan object.
453   * <p>
454   * Notice that usually you should use this method with a {@link Scan} object that has limit set.
455   * For example, if you want to get the closest row after a given row, you could do this:
456   * <p>
457   *
458   * <pre>
459   * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -&gt; {
460   *   if (results.isEmpty()) {
461   *     System.out.println("No row after " + Bytes.toStringBinary(row));
462   *   } else {
463   *     System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
464   *       + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
465   *   }
466   * });
467   * </pre>
468   * <p>
469   * If your result set is very large, you should use other scan method to get a scanner or use
470   * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
471   * fetch all the results and store them in a List and then return the list to you.
472   * <p>
473   * The scan metrics will be collected background if you enable it but you have no way to get it.
474   * Usually you can get scan metrics from {@code ResultScanner}, or through
475   * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
476   * So if you really care about scan metrics then you'd better use other scan methods which return
477   * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
478   * performance difference between these scan methods so do not worry.
479   * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
480   *             result set, it is likely to cause OOM.
481   * @return The results of this small scan operation. The return value will be wrapped by a
482   *         {@link CompletableFuture}.
483   */
484  CompletableFuture<List<Result>> scanAll(Scan scan);
485
486  /**
487   * Test for the existence of columns in the table, as specified by the Gets.
488   * <p>
489   * This will return a list of booleans. Each value will be true if the related Get matches one or
490   * more keys, false if not.
491   * <p>
492   * This is a server-side call so it prevents any data from being transferred to the client.
493   * @param gets the Gets
494   * @return A list of {@link CompletableFuture}s that represent the existence for each get.
495   */
496  default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
497    return get(toCheckExistenceOnly(gets)).stream()
498      .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
499  }
500
501  /**
502   * A simple version for batch exists. It will fail if there are any failures and you will get the
503   * whole result boolean list at once if the operation is succeeded.
504   * @param gets the Gets
505   * @return A {@link CompletableFuture} that wrapper the result boolean list.
506   */
507  default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
508    return allOf(exists(gets));
509  }
510
511  /**
512   * Extracts certain cells from the given rows, in batch.
513   * <p>
514   * Notice that you may not get all the results with this function, which means some of the
515   * returned {@link CompletableFuture}s may succeed while some of the other returned
516   * {@link CompletableFuture}s may fail.
517   * @param gets The objects that specify what data to fetch and from which rows.
518   * @return A list of {@link CompletableFuture}s that represent the result for each get.
519   */
520  List<CompletableFuture<Result>> get(List<Get> gets);
521
522  /**
523   * A simple version for batch get. It will fail if there are any failures and you will get the
524   * whole result list at once if the operation is succeeded.
525   * @param gets The objects that specify what data to fetch and from which rows.
526   * @return A {@link CompletableFuture} that wrapper the result list.
527   */
528  default CompletableFuture<List<Result>> getAll(List<Get> gets) {
529    return allOf(get(gets));
530  }
531
532  /**
533   * Puts some data in the table, in batch.
534   * @param puts The list of mutations to apply.
535   * @return A list of {@link CompletableFuture}s that represent the result for each put.
536   */
537  List<CompletableFuture<Void>> put(List<Put> puts);
538
539  /**
540   * A simple version of batch put. It will fail if there are any failures.
541   * @param puts The list of mutations to apply.
542   * @return A {@link CompletableFuture} that always returns null when complete normally.
543   */
544  default CompletableFuture<Void> putAll(List<Put> puts) {
545    return allOf(put(puts)).thenApply(r -> null);
546  }
547
548  /**
549   * Deletes the specified cells/rows in bulk.
550   * @param deletes list of things to delete.
551   * @return A list of {@link CompletableFuture}s that represent the result for each delete.
552   */
553  List<CompletableFuture<Void>> delete(List<Delete> deletes);
554
555  /**
556   * A simple version of batch delete. It will fail if there are any failures.
557   * @param deletes list of things to delete.
558   * @return A {@link CompletableFuture} that always returns null when complete normally.
559   */
560  default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
561    return allOf(delete(deletes)).thenApply(r -> null);
562  }
563
564  /**
565   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The
566   * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the
567   * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the
568   * Put had put.
569   * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
570   * @return A list of {@link CompletableFuture}s that represent the result for each action.
571   */
572  <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
573
574  /**
575   * A simple version of batch. It will fail if there are any failures and you will get the whole
576   * result list at once if the operation is succeeded.
577   * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
578   * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
579   */
580  default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
581    return allOf(batch(actions));
582  }
583
584  /**
585   * Execute the given coprocessor call on the region which contains the given {@code row}.
586   * <p>
587   * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
588   * one line lambda expression, like:
589   *
590   * <pre>
591   * channel -&gt; xxxService.newStub(channel)
592   * </pre>
593   *
594   * @param stubMaker a delegation to the actual {@code newStub} call.
595   * @param callable  a delegation to the actual protobuf rpc call. See the comment of
596   *                  {@link ServiceCaller} for more details.
597   * @param row       The row key used to identify the remote region location
598   * @param <S>       the type of the asynchronous stub
599   * @param <R>       the type of the return value
600   * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
601   * @see ServiceCaller
602   */
603  <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
604    ServiceCaller<S, R> callable, byte[] row);
605
606  /**
607   * The callback when we want to execute a coprocessor call on a range of regions.
608   * <p>
609   * As the locating itself also takes some time, the implementation may want to send rpc calls on
610   * the fly, which means we do not know how many regions we have when we get the return value of
611   * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
612   * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
613   * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
614   * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
615   * calls in the future.
616   * <p>
617   * Here is a pseudo code to describe a typical implementation of a range coprocessor service
618   * method to help you better understand how the {@link CoprocessorCallback} will be called. The
619   * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
620   * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
621   *
622   * <pre>
623   * locateThenCall(byte[] row) {
624   *   locate(row).whenComplete((location, locateError) -&gt; {
625   *     if (locateError != null) {
626   *       callback.onError(locateError);
627   *       return;
628   *     }
629   *     incPendingCall();
630   *     region = location.getRegion();
631   *     if (region.getEndKey() > endKey) {
632   *       locateEnd = true;
633   *     } else {
634   *       locateThenCall(region.getEndKey());
635   *     }
636   *     sendCall().whenComplete((resp, error) -&gt; {
637   *       if (error != null) {
638   *         callback.onRegionError(region, error);
639   *       } else {
640   *         callback.onRegionComplete(region, resp);
641   *       }
642   *       if (locateEnd && decPendingCallAndGet() == 0) {
643   *         callback.onComplete();
644   *       }
645   *     });
646   *   });
647   * }
648   * </pre>
649   */
650  @InterfaceAudience.Public
651  interface CoprocessorCallback<R> {
652
653    /**
654     * Indicate that the respose of a region is available
655     * @param region the region that the response belongs to
656     * @param resp   the response of the coprocessor call
657     */
658    void onRegionComplete(RegionInfo region, R resp);
659
660    /**
661     * Indicate that the error for a region is available
662     * @param region the region that the error belongs to
663     * @param error  the response error of the coprocessor call
664     */
665    void onRegionError(RegionInfo region, Throwable error);
666
667    /**
668     * Indicate that all responses of the regions have been notified by calling
669     * {@link #onRegionComplete(RegionInfo, Object)} or
670     * {@link #onRegionError(RegionInfo, Throwable)}.
671     */
672    void onComplete();
673
674    /**
675     * Indicate that we got an error which does not belong to any regions. Usually a locating error.
676     */
677    void onError(Throwable error);
678  }
679
680  /**
681   * Some coprocessors may support the idea of "partial results." If for some reason a coprocessor
682   * cannot return all results for a given region in a single response, the client side can be
683   * designed to recognize this and continuing requesting more results until they are completely
684   * accumulated in the client.
685   * <p>
686   * It is up to a particular coprocessor implementation and its corresponding clients to agree on
687   * what it means for results to be incomplete, how this state is communicated, and how multiple
688   * incomplete results are accumulated together.
689   * <p>
690   * Use this callback when you want to execute a coprocessor call on a range of regions, and that
691   * coprocessor may return incomplete results for a given region. See also the docs for
692   * {@link CoprocessorCallback}, which all apply here to its child interface too.
693   */
694  @InterfaceAudience.Public
695  interface PartialResultCoprocessorCallback<S, R> extends CoprocessorCallback<R> {
696    /**
697     * Subclasses should implement this to tell AsyncTable whether the given response is "final" or
698     * whether the AsyncTable should send another request to the coprocessor to fetch more results
699     * from the given region. This method of fetching more results can be used many times until
700     * there are no more results to fetch from the region.
701     * @param response The response received from the coprocessor
702     * @param region   The region the response came from
703     * @return A ServiceCaller object if the response was not final and therefore another request is
704     *         required to continuing fetching results. null if no more requests need to be sent to
705     *         the region.
706     */
707    ServiceCaller<S, R> getNextCallable(R response, RegionInfo region);
708
709    /**
710     * Subclasses should implement this such that, when the above method returns non-null, this
711     * method returns the duration that AsyncTable should wait before sending the next request to
712     * the given region. You can use this to create a back-off behavior to reduce load on the
713     * RegionServer. If that's not desired, you can always return {@link Duration.ZERO}.
714     * @param response The response received from the coprocessor
715     * @param region   The region the response came from
716     * @return The duration to wait.
717     */
718    Duration getWaitInterval(R response, RegionInfo region);
719  }
720
721  /**
722   * Helper class for sending coprocessorService request that executes a coprocessor call on regions
723   * which are covered by a range.
724   * <p>
725   * If {@code fromRow} is not specified the selection will start with the first table region. If
726   * {@code toRow} is not specified the selection will continue through the last table region.
727   * @param <S> the type of the protobuf Service you want to call.
728   * @param <R> the type of the return value.
729   */
730  interface CoprocessorServiceBuilder<S, R> {
731
732    /**
733     * Specify a start row
734     * @param startKey start region selection with region containing this row, inclusive.
735     */
736    default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
737      return fromRow(startKey, true);
738    }
739
740    /**
741     * Specify a start row
742     * @param startKey  start region selection with region containing this row
743     * @param inclusive whether to include the startKey
744     */
745    CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
746
747    /**
748     * Specify a stop row
749     * @param endKey select regions up to and including the region containing this row, exclusive.
750     */
751    default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
752      return toRow(endKey, false);
753    }
754
755    /**
756     * Specify a stop row
757     * @param endKey    select regions up to and including the region containing this row
758     * @param inclusive whether to include the endKey
759     */
760    CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
761
762    /**
763     * Execute the coprocessorService request. You can get the response through the
764     * {@link CoprocessorCallback}.
765     */
766    void execute();
767  }
768
769  /**
770   * Execute a coprocessor call on the regions which are covered by a range.
771   * <p>
772   * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
773   * <p>
774   * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
775   * is only a one line lambda expression, like:
776   *
777   * <pre>
778   * channel -&gt; xxxService.newStub(channel)
779   * </pre>
780   *
781   * @param stubMaker a delegation to the actual {@code newStub} call.
782   * @param callable  a delegation to the actual protobuf rpc call. See the comment of
783   *                  {@link ServiceCaller} for more details.
784   * @param callback  callback to get the response. See the comment of {@link CoprocessorCallback}
785   *                  for more details.
786   */
787  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
788    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
789
790  /**
791   * Similar to above. Use when your coprocessor client+endpoint supports partial results. If the
792   * server does not offer partial results, it is still safe to use this, assuming you implement
793   * your {@link PartialResultCoprocessorCallback#getNextCallable(Object, RegionInfo)} correctly.
794   */
795  <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
796    ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback);
797}