001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; 022import static org.apache.hadoop.hbase.util.FutureUtils.allOf; 023 024import com.google.protobuf.RpcChannel; 025import java.util.List; 026import java.util.Map; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.TimeUnit; 029import java.util.function.Function; 030import org.apache.commons.lang3.NotImplementedException; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.CompareOperator; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.filter.Filter; 035import org.apache.hadoop.hbase.io.TimeRange; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040 041/** 042 * The interface for asynchronous version of Table. Obtain an instance from a 043 * {@link AsyncConnection}. 044 * <p> 045 * The implementation is required to be thread safe. 046 * <p> 047 * Usually the implementation will not throw any exception directly. You need to get the exception 048 * from the returned {@link CompletableFuture}. 049 * @since 2.0.0 050 */ 051@InterfaceAudience.Public 052public interface AsyncTable<C extends ScanResultConsumerBase> { 053 054 /** 055 * Gets the fully qualified table name instance of this table. 056 */ 057 TableName getName(); 058 059 /** 060 * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. 061 * <p> 062 * The reference returned is not a copy, so any change made to it will affect this instance. 063 */ 064 Configuration getConfiguration(); 065 066 /** 067 * Gets the {@link TableDescriptor} for this table. 068 */ 069 CompletableFuture<TableDescriptor> getDescriptor(); 070 071 /** 072 * Gets the {@link AsyncTableRegionLocator} for this table. 073 */ 074 AsyncTableRegionLocator getRegionLocator(); 075 076 /** 077 * Get timeout of each rpc request in this Table instance. It will be overridden by a more 078 * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. 079 * @see #getReadRpcTimeout(TimeUnit) 080 * @see #getWriteRpcTimeout(TimeUnit) 081 * @param unit the unit of time the timeout to be represented in 082 * @return rpc timeout in the specified time unit 083 */ 084 long getRpcTimeout(TimeUnit unit); 085 086 /** 087 * Get timeout of each rpc read request in this Table instance. 088 * @param unit the unit of time the timeout to be represented in 089 * @return read rpc timeout in the specified time unit 090 */ 091 long getReadRpcTimeout(TimeUnit unit); 092 093 /** 094 * Get timeout of each rpc write request in this Table instance. 095 * @param unit the unit of time the timeout to be represented in 096 * @return write rpc timeout in the specified time unit 097 */ 098 long getWriteRpcTimeout(TimeUnit unit); 099 100 /** 101 * Get timeout of each operation in Table instance. 102 * @param unit the unit of time the timeout to be represented in 103 * @return operation rpc timeout in the specified time unit 104 */ 105 long getOperationTimeout(TimeUnit unit); 106 107 /** 108 * Get the timeout of a single operation in a scan. It works like operation timeout for other 109 * operations. 110 * @param unit the unit of time the timeout to be represented in 111 * @return scan rpc timeout in the specified time unit 112 */ 113 long getScanTimeout(TimeUnit unit); 114 115 /** 116 * Get the map of request attributes 117 * @return a map of request attributes supplied by the client 118 */ 119 default Map<String, byte[]> getRequestAttributes() { 120 throw new NotImplementedException("Add an implementation!"); 121 } 122 123 /** 124 * Test for the existence of columns in the table, as specified by the Get. 125 * <p> 126 * This will return true if the Get matches one or more keys, false if not. 127 * <p> 128 * This is a server-side call so it prevents any data from being transfered to the client. 129 * @return true if the specified Get matches one or more keys, false if not. The return value will 130 * be wrapped by a {@link CompletableFuture}. 131 */ 132 default CompletableFuture<Boolean> exists(Get get) { 133 return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); 134 } 135 136 /** 137 * Extracts certain cells from a given row. 138 * @param get The object that specifies what data to fetch and from which row. 139 * @return The data coming from the specified row, if it exists. If the row specified doesn't 140 * exist, the {@link Result} instance returned won't contain any 141 * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The 142 * return value will be wrapped by a {@link CompletableFuture}. 143 */ 144 CompletableFuture<Result> get(Get get); 145 146 /** 147 * Puts some data to the table. 148 * @param put The data to put. 149 * @return A {@link CompletableFuture} that always returns null when complete normally. 150 */ 151 CompletableFuture<Void> put(Put put); 152 153 /** 154 * Deletes the specified cells/row. 155 * @param delete The object that specifies what to delete. 156 * @return A {@link CompletableFuture} that always returns null when complete normally. 157 */ 158 CompletableFuture<Void> delete(Delete delete); 159 160 /** 161 * Appends values to one or more columns within a single row. 162 * <p> 163 * This operation does not appear atomic to readers. Appends are done under a single row lock, so 164 * write operations to a row are synchronized, but readers do not take row locks so get and scan 165 * operations can see this operation partially completed. 166 * @param append object that specifies the columns and amounts to be used for the increment 167 * operations 168 * @return values of columns after the append operation (maybe null). The return value will be 169 * wrapped by a {@link CompletableFuture}. 170 */ 171 CompletableFuture<Result> append(Append append); 172 173 /** 174 * Increments one or more columns within a single row. 175 * <p> 176 * This operation does not appear atomic to readers. Increments are done under a single row lock, 177 * so write operations to a row are synchronized, but readers do not take row locks so get and 178 * scan operations can see this operation partially completed. 179 * @param increment object that specifies the columns and amounts to be used for the increment 180 * operations 181 * @return values of columns after the increment. The return value will be wrapped by a 182 * {@link CompletableFuture}. 183 */ 184 CompletableFuture<Result> increment(Increment increment); 185 186 /** 187 * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} 188 * <p> 189 * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. 190 * @param row The row that contains the cell to increment. 191 * @param family The column family of the cell to increment. 192 * @param qualifier The column qualifier of the cell to increment. 193 * @param amount The amount to increment the cell with (or decrement, if the amount is 194 * negative). 195 * @return The new value, post increment. The return value will be wrapped by a 196 * {@link CompletableFuture}. 197 */ 198 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 199 long amount) { 200 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 201 } 202 203 /** 204 * Atomically increments a column value. If the column value already exists and is not a 205 * big-endian long, this could throw an exception. If the column value does not yet exist it is 206 * initialized to <code>amount</code> and written to the specified column. 207 * <p> 208 * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose 209 * any increments that have not been flushed. 210 * @param row The row that contains the cell to increment. 211 * @param family The column family of the cell to increment. 212 * @param qualifier The column qualifier of the cell to increment. 213 * @param amount The amount to increment the cell with (or decrement, if the amount is 214 * negative). 215 * @param durability The persistence guarantee for this increment. 216 * @return The new value, post increment. The return value will be wrapped by a 217 * {@link CompletableFuture}. 218 */ 219 default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, 220 long amount, Durability durability) { 221 Preconditions.checkNotNull(row, "row is null"); 222 Preconditions.checkNotNull(family, "family is null"); 223 return increment( 224 new Increment(row).addColumn(family, qualifier, amount).setDurability(durability)) 225 .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier))); 226 } 227 228 /** 229 * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it 230 * adds the Put/Delete/RowMutations. 231 * <p> 232 * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it. 233 * This is a fluent style API, the code is like: 234 * 235 * <pre> 236 * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put) 237 * .thenAccept(succ -> { 238 * if (succ) { 239 * System.out.println("Check and put succeeded"); 240 * } else { 241 * System.out.println("Check and put failed"); 242 * } 243 * }); 244 * </pre> 245 * 246 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 247 * any more. 248 */ 249 @Deprecated 250 CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family); 251 252 /** 253 * A helper class for sending checkAndMutate request. 254 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 255 * any more. 256 */ 257 @Deprecated 258 interface CheckAndMutateBuilder { 259 260 /** 261 * Match a qualifier. 262 * @param qualifier column qualifier to check. 263 */ 264 CheckAndMutateBuilder qualifier(byte[] qualifier); 265 266 /** 267 * Match a timerange. 268 * @param timeRange time range to check. 269 */ 270 CheckAndMutateBuilder timeRange(TimeRange timeRange); 271 272 /** 273 * Check for lack of column. 274 */ 275 CheckAndMutateBuilder ifNotExists(); 276 277 /** 278 * Check for equality. 279 * @param value the expected value 280 */ 281 default CheckAndMutateBuilder ifEquals(byte[] value) { 282 return ifMatches(CompareOperator.EQUAL, value); 283 } 284 285 /** 286 * Compare a value 287 * @param compareOp comparison operator to use 288 * @param value the expected value 289 */ 290 CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value); 291 292 /** 293 * Specify a Put to commit if the check succeeds. 294 * @param put data to put if check succeeds 295 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 296 * will be wrapped by a {@link CompletableFuture}. 297 */ 298 CompletableFuture<Boolean> thenPut(Put put); 299 300 /** 301 * Specify a Delete to commit if the check succeeds. 302 * @param delete data to delete if check succeeds 303 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 304 * value will be wrapped by a {@link CompletableFuture}. 305 */ 306 CompletableFuture<Boolean> thenDelete(Delete delete); 307 308 /** 309 * Specify a RowMutations to commit if the check succeeds. 310 * @param mutation mutations to perform if check succeeds 311 * @return true if the new mutation was executed, false otherwise. The return value will be 312 * wrapped by a {@link CompletableFuture}. 313 */ 314 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 315 } 316 317 /** 318 * Atomically checks if a row matches the specified filter. If it does, it adds the 319 * Put/Delete/RowMutations. 320 * <p> 321 * Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then 322 * execute it. This is a fluent style API, the code is like: 323 * 324 * <pre> 325 * table.checkAndMutate(row, filter).thenPut(put).thenAccept(succ -> { 326 * if (succ) { 327 * System.out.println("Check and put succeeded"); 328 * } else { 329 * System.out.println("Check and put failed"); 330 * } 331 * }); 332 * </pre> 333 * 334 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 335 * any more. 336 */ 337 @Deprecated 338 CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter); 339 340 /** 341 * A helper class for sending checkAndMutate request with a filter. 342 * @deprecated Since 2.4.0, will be removed in 4.0.0. For internal test use only, do not use it 343 * any more. 344 */ 345 @Deprecated 346 interface CheckAndMutateWithFilterBuilder { 347 348 /** 349 * Match a timerange. 350 * @param timeRange time range to check. 351 */ 352 CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange); 353 354 /** 355 * Specify a Put to commit if the check succeeds. 356 * @param put data to put if check succeeds 357 * @return {@code true} if the new put was executed, {@code false} otherwise. The return value 358 * will be wrapped by a {@link CompletableFuture}. 359 */ 360 CompletableFuture<Boolean> thenPut(Put put); 361 362 /** 363 * Specify a Delete to commit if the check succeeds. 364 * @param delete data to delete if check succeeds 365 * @return {@code true} if the new delete was executed, {@code false} otherwise. The return 366 * value will be wrapped by a {@link CompletableFuture}. 367 */ 368 CompletableFuture<Boolean> thenDelete(Delete delete); 369 370 /** 371 * Specify a RowMutations to commit if the check succeeds. 372 * @param mutation mutations to perform if check succeeds 373 * @return true if the new mutation was executed, false otherwise. The return value will be 374 * wrapped by a {@link CompletableFuture}. 375 */ 376 CompletableFuture<Boolean> thenMutate(RowMutations mutation); 377 } 378 379 /** 380 * checkAndMutate that atomically checks if a row matches the specified condition. If it does, it 381 * performs the specified action. 382 * @param checkAndMutate The CheckAndMutate object. 383 * @return A {@link CompletableFuture}s that represent the result for the CheckAndMutate. 384 */ 385 CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate); 386 387 /** 388 * Batch version of checkAndMutate. The specified CheckAndMutates are batched only in the sense 389 * that they are sent to a RS in one RPC, but each CheckAndMutate operation is still executed 390 * atomically (and thus, each may fail independently of others). 391 * @param checkAndMutates The list of CheckAndMutate. 392 * @return A list of {@link CompletableFuture}s that represent the result for each CheckAndMutate. 393 */ 394 List<CompletableFuture<CheckAndMutateResult>> 395 checkAndMutate(List<CheckAndMutate> checkAndMutates); 396 397 /** 398 * A simple version of batch checkAndMutate. It will fail if there are any failures. 399 * @param checkAndMutates The list of rows to apply. 400 * @return A {@link CompletableFuture} that wrapper the result list. 401 */ 402 default CompletableFuture<List<CheckAndMutateResult>> 403 checkAndMutateAll(List<CheckAndMutate> checkAndMutates) { 404 return allOf(checkAndMutate(checkAndMutates)); 405 } 406 407 /** 408 * Performs multiple mutations atomically on a single row. Currently {@link Put} and 409 * {@link Delete} are supported. 410 * @param mutation object that specifies the set of mutations to perform atomically 411 * @return A {@link CompletableFuture} that returns results of Increment/Append operations 412 */ 413 CompletableFuture<Result> mutateRow(RowMutations mutation); 414 415 /** 416 * The scan API uses the observer pattern. 417 * @param scan A configured {@link Scan} object. 418 * @param consumer the consumer used to receive results. 419 * @see ScanResultConsumer 420 * @see AdvancedScanResultConsumer 421 */ 422 void scan(Scan scan, C consumer); 423 424 /** 425 * Gets a scanner on the current table for the given family. 426 * @param family The column family to scan. 427 * @return A scanner. 428 */ 429 default ResultScanner getScanner(byte[] family) { 430 return getScanner(new Scan().addFamily(family)); 431 } 432 433 /** 434 * Gets a scanner on the current table for the given family and qualifier. 435 * @param family The column family to scan. 436 * @param qualifier The column qualifier to scan. 437 * @return A scanner. 438 */ 439 default ResultScanner getScanner(byte[] family, byte[] qualifier) { 440 return getScanner(new Scan().addColumn(family, qualifier)); 441 } 442 443 /** 444 * Returns a scanner on the current table as specified by the {@link Scan} object. 445 * @param scan A configured {@link Scan} object. 446 * @return A scanner. 447 */ 448 ResultScanner getScanner(Scan scan); 449 450 /** 451 * Return all the results that match the given scan object. 452 * <p> 453 * Notice that usually you should use this method with a {@link Scan} object that has limit set. 454 * For example, if you want to get the closest row after a given row, you could do this: 455 * <p> 456 * 457 * <pre> 458 * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> { 459 * if (results.isEmpty()) { 460 * System.out.println("No row after " + Bytes.toStringBinary(row)); 461 * } else { 462 * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is " 463 * + Bytes.toStringBinary(results.stream().findFirst().get().getRow())); 464 * } 465 * }); 466 * </pre> 467 * <p> 468 * If your result set is very large, you should use other scan method to get a scanner or use 469 * callback to process the results. They will do chunking to prevent OOM. The scanAll method will 470 * fetch all the results and store them in a List and then return the list to you. 471 * <p> 472 * The scan metrics will be collected background if you enable it but you have no way to get it. 473 * Usually you can get scan metrics from {@code ResultScanner}, or through 474 * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results. 475 * So if you really care about scan metrics then you'd better use other scan methods which return 476 * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no 477 * performance difference between these scan methods so do not worry. 478 * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large 479 * result set, it is likely to cause OOM. 480 * @return The results of this small scan operation. The return value will be wrapped by a 481 * {@link CompletableFuture}. 482 */ 483 CompletableFuture<List<Result>> scanAll(Scan scan); 484 485 /** 486 * Test for the existence of columns in the table, as specified by the Gets. 487 * <p> 488 * This will return a list of booleans. Each value will be true if the related Get matches one or 489 * more keys, false if not. 490 * <p> 491 * This is a server-side call so it prevents any data from being transferred to the client. 492 * @param gets the Gets 493 * @return A list of {@link CompletableFuture}s that represent the existence for each get. 494 */ 495 default List<CompletableFuture<Boolean>> exists(List<Get> gets) { 496 return get(toCheckExistenceOnly(gets)).stream() 497 .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList()); 498 } 499 500 /** 501 * A simple version for batch exists. It will fail if there are any failures and you will get the 502 * whole result boolean list at once if the operation is succeeded. 503 * @param gets the Gets 504 * @return A {@link CompletableFuture} that wrapper the result boolean list. 505 */ 506 default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) { 507 return allOf(exists(gets)); 508 } 509 510 /** 511 * Extracts certain cells from the given rows, in batch. 512 * <p> 513 * Notice that you may not get all the results with this function, which means some of the 514 * returned {@link CompletableFuture}s may succeed while some of the other returned 515 * {@link CompletableFuture}s may fail. 516 * @param gets The objects that specify what data to fetch and from which rows. 517 * @return A list of {@link CompletableFuture}s that represent the result for each get. 518 */ 519 List<CompletableFuture<Result>> get(List<Get> gets); 520 521 /** 522 * A simple version for batch get. It will fail if there are any failures and you will get the 523 * whole result list at once if the operation is succeeded. 524 * @param gets The objects that specify what data to fetch and from which rows. 525 * @return A {@link CompletableFuture} that wrapper the result list. 526 */ 527 default CompletableFuture<List<Result>> getAll(List<Get> gets) { 528 return allOf(get(gets)); 529 } 530 531 /** 532 * Puts some data in the table, in batch. 533 * @param puts The list of mutations to apply. 534 * @return A list of {@link CompletableFuture}s that represent the result for each put. 535 */ 536 List<CompletableFuture<Void>> put(List<Put> puts); 537 538 /** 539 * A simple version of batch put. It will fail if there are any failures. 540 * @param puts The list of mutations to apply. 541 * @return A {@link CompletableFuture} that always returns null when complete normally. 542 */ 543 default CompletableFuture<Void> putAll(List<Put> puts) { 544 return allOf(put(puts)).thenApply(r -> null); 545 } 546 547 /** 548 * Deletes the specified cells/rows in bulk. 549 * @param deletes list of things to delete. 550 * @return A list of {@link CompletableFuture}s that represent the result for each delete. 551 */ 552 List<CompletableFuture<Void>> delete(List<Delete> deletes); 553 554 /** 555 * A simple version of batch delete. It will fail if there are any failures. 556 * @param deletes list of things to delete. 557 * @return A {@link CompletableFuture} that always returns null when complete normally. 558 */ 559 default CompletableFuture<Void> deleteAll(List<Delete> deletes) { 560 return allOf(delete(deletes)).thenApply(r -> null); 561 } 562 563 /** 564 * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations. The 565 * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get in the 566 * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the 567 * Put had put. 568 * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects 569 * @return A list of {@link CompletableFuture}s that represent the result for each action. 570 */ 571 <T> List<CompletableFuture<T>> batch(List<? extends Row> actions); 572 573 /** 574 * A simple version of batch. It will fail if there are any failures and you will get the whole 575 * result list at once if the operation is succeeded. 576 * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects 577 * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. 578 */ 579 default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) { 580 return allOf(batch(actions)); 581 } 582 583 /** 584 * Execute the given coprocessor call on the region which contains the given {@code row}. 585 * <p> 586 * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a 587 * one line lambda expression, like: 588 * 589 * <pre> 590 * channel -> xxxService.newStub(channel) 591 * </pre> 592 * 593 * @param stubMaker a delegation to the actual {@code newStub} call. 594 * @param callable a delegation to the actual protobuf rpc call. See the comment of 595 * {@link ServiceCaller} for more details. 596 * @param row The row key used to identify the remote region location 597 * @param <S> the type of the asynchronous stub 598 * @param <R> the type of the return value 599 * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. 600 * @see ServiceCaller 601 */ 602 <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 603 ServiceCaller<S, R> callable, byte[] row); 604 605 /** 606 * The callback when we want to execute a coprocessor call on a range of regions. 607 * <p> 608 * As the locating itself also takes some time, the implementation may want to send rpc calls on 609 * the fly, which means we do not know how many regions we have when we get the return value of 610 * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have 611 * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)} 612 * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no 613 * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)} 614 * calls in the future. 615 * <p> 616 * Here is a pseudo code to describe a typical implementation of a range coprocessor service 617 * method to help you better understand how the {@link CoprocessorCallback} will be called. The 618 * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the 619 * {@code whenComplete} is {@code CompletableFuture.whenComplete}. 620 * 621 * <pre> 622 * locateThenCall(byte[] row) { 623 * locate(row).whenComplete((location, locateError) -> { 624 * if (locateError != null) { 625 * callback.onError(locateError); 626 * return; 627 * } 628 * incPendingCall(); 629 * region = location.getRegion(); 630 * if (region.getEndKey() > endKey) { 631 * locateEnd = true; 632 * } else { 633 * locateThenCall(region.getEndKey()); 634 * } 635 * sendCall().whenComplete((resp, error) -> { 636 * if (error != null) { 637 * callback.onRegionError(region, error); 638 * } else { 639 * callback.onRegionComplete(region, resp); 640 * } 641 * if (locateEnd && decPendingCallAndGet() == 0) { 642 * callback.onComplete(); 643 * } 644 * }); 645 * }); 646 * } 647 * </pre> 648 */ 649 @InterfaceAudience.Public 650 interface CoprocessorCallback<R> { 651 652 /** 653 * Indicate that the respose of a region is available 654 * @param region the region that the response belongs to 655 * @param resp the response of the coprocessor call 656 */ 657 void onRegionComplete(RegionInfo region, R resp); 658 659 /** 660 * Indicate that the error for a region is available 661 * @param region the region that the error belongs to 662 * @param error the response error of the coprocessor call 663 */ 664 void onRegionError(RegionInfo region, Throwable error); 665 666 /** 667 * Indicate that all responses of the regions have been notified by calling 668 * {@link #onRegionComplete(RegionInfo, Object)} or 669 * {@link #onRegionError(RegionInfo, Throwable)}. 670 */ 671 void onComplete(); 672 673 /** 674 * Indicate that we got an error which does not belong to any regions. Usually a locating error. 675 */ 676 void onError(Throwable error); 677 } 678 679 /** 680 * Helper class for sending coprocessorService request that executes a coprocessor call on regions 681 * which are covered by a range. 682 * <p> 683 * If {@code fromRow} is not specified the selection will start with the first table region. If 684 * {@code toRow} is not specified the selection will continue through the last table region. 685 * @param <S> the type of the protobuf Service you want to call. 686 * @param <R> the type of the return value. 687 */ 688 interface CoprocessorServiceBuilder<S, R> { 689 690 /** 691 * Specify a start row 692 * @param startKey start region selection with region containing this row, inclusive. 693 */ 694 default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) { 695 return fromRow(startKey, true); 696 } 697 698 /** 699 * Specify a start row 700 * @param startKey start region selection with region containing this row 701 * @param inclusive whether to include the startKey 702 */ 703 CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive); 704 705 /** 706 * Specify a stop row 707 * @param endKey select regions up to and including the region containing this row, exclusive. 708 */ 709 default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) { 710 return toRow(endKey, false); 711 } 712 713 /** 714 * Specify a stop row 715 * @param endKey select regions up to and including the region containing this row 716 * @param inclusive whether to include the endKey 717 */ 718 CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive); 719 720 /** 721 * Execute the coprocessorService request. You can get the response through the 722 * {@link CoprocessorCallback}. 723 */ 724 void execute(); 725 } 726 727 /** 728 * Execute a coprocessor call on the regions which are covered by a range. 729 * <p> 730 * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it. 731 * <p> 732 * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it 733 * is only a one line lambda expression, like: 734 * 735 * <pre> 736 * channel -> xxxService.newStub(channel) 737 * </pre> 738 * 739 * @param stubMaker a delegation to the actual {@code newStub} call. 740 * @param callable a delegation to the actual protobuf rpc call. See the comment of 741 * {@link ServiceCaller} for more details. 742 * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} 743 * for more details. 744 */ 745 <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker, 746 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback); 747}