001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; 026import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 027import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; 028import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 029 030import com.google.protobuf.RpcChannel; 031import io.opentelemetry.api.trace.Span; 032import io.opentelemetry.api.trace.StatusCode; 033import io.opentelemetry.context.Scope; 034import java.io.IOException; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.List; 038import java.util.Map; 039import java.util.concurrent.CompletableFuture; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.function.Function; 044import java.util.function.Supplier; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.CompareOperator; 047import org.apache.hadoop.hbase.DoNotRetryIOException; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.HRegionLocation; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; 052import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 053import org.apache.hadoop.hbase.filter.Filter; 054import org.apache.hadoop.hbase.io.TimeRange; 055import org.apache.hadoop.hbase.ipc.HBaseRpcController; 056import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 057import org.apache.hadoop.hbase.trace.TraceUtil; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.ReflectionUtils; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 065import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 066import org.apache.hbase.thirdparty.io.netty.util.Timer; 067 068import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 069import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 070import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 078 079/** 080 * The implementation of RawAsyncTable. 081 * <p/> 082 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will 083 * be finished inside the rpc framework thread, which means that the callbacks registered to the 084 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use 085 * this class should not try to do time consuming tasks in the callbacks. 086 * @since 2.0.0 087 * @see AsyncTableImpl 088 */ 089@InterfaceAudience.Private 090class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { 091 092 private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); 093 094 private final AsyncConnectionImpl conn; 095 096 private final Timer retryTimer; 097 098 private final TableName tableName; 099 100 private final int defaultScannerCaching; 101 102 private final long defaultScannerMaxResultSize; 103 104 private final long rpcTimeoutNs; 105 106 private final long readRpcTimeoutNs; 107 108 private final long writeRpcTimeoutNs; 109 110 private final long operationTimeoutNs; 111 112 private final long scanTimeoutNs; 113 114 private final long pauseNs; 115 116 private final long pauseNsForServerOverloaded; 117 118 private final int maxAttempts; 119 120 private final int startLogErrorsCnt; 121 122 private final Map<String, byte[]> requestAttributes; 123 124 RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) { 125 this.conn = conn; 126 this.retryTimer = retryTimer; 127 this.tableName = builder.tableName; 128 this.rpcTimeoutNs = builder.rpcTimeoutNs; 129 this.readRpcTimeoutNs = builder.readRpcTimeoutNs; 130 this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs; 131 this.operationTimeoutNs = builder.operationTimeoutNs; 132 this.scanTimeoutNs = builder.scanTimeoutNs; 133 this.pauseNs = builder.pauseNs; 134 if (builder.pauseNsForServerOverloaded < builder.pauseNs) { 135 LOG.warn( 136 "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" 137 + " the normal pause value {} ms, use the greater one instead", 138 TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), 139 TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); 140 this.pauseNsForServerOverloaded = builder.pauseNs; 141 } else { 142 this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; 143 } 144 this.maxAttempts = builder.maxAttempts; 145 this.startLogErrorsCnt = builder.startLogErrorsCnt; 146 this.defaultScannerCaching = tableName.isSystemTable() 147 ? conn.connConf.getMetaScannerCaching() 148 : conn.connConf.getScannerCaching(); 149 this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); 150 this.requestAttributes = builder.requestAttributes; 151 } 152 153 @Override 154 public TableName getName() { 155 return tableName; 156 } 157 158 @Override 159 public Configuration getConfiguration() { 160 return conn.getConfiguration(); 161 } 162 163 @Override 164 public CompletableFuture<TableDescriptor> getDescriptor() { 165 return conn.getAdmin().getDescriptor(tableName); 166 } 167 168 @Override 169 public AsyncTableRegionLocator getRegionLocator() { 170 return conn.getRegionLocator(tableName); 171 } 172 173 @FunctionalInterface 174 private interface Converter<D, I, S> { 175 D convert(I info, S src) throws IOException; 176 } 177 178 @FunctionalInterface 179 private interface RpcCall<RESP, REQ> { 180 void call(ClientService.Interface stub, HBaseRpcController controller, REQ req, 181 RpcCallback<RESP> done); 182 } 183 184 private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call( 185 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 186 Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall, 187 Converter<RESP, HBaseRpcController, PRESP> respConverter) { 188 CompletableFuture<RESP> future = new CompletableFuture<>(); 189 try { 190 rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req), 191 new RpcCallback<PRESP>() { 192 193 @Override 194 public void run(PRESP resp) { 195 if (controller.failed()) { 196 future.completeExceptionally(controller.getFailed()); 197 } else { 198 try { 199 future.complete(respConverter.convert(controller, resp)); 200 } catch (IOException e) { 201 future.completeExceptionally(e); 202 } 203 } 204 } 205 }); 206 } catch (IOException e) { 207 future.completeExceptionally(e); 208 } 209 return future; 210 } 211 212 private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller, 213 HRegionLocation loc, ClientService.Interface stub, REQ req, 214 Converter<MutateRequest, byte[], REQ> reqConvert, 215 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 216 return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done), 217 respConverter); 218 } 219 220 private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller, 221 HRegionLocation loc, ClientService.Interface stub, REQ req, 222 Converter<MutateRequest, byte[], REQ> reqConvert) { 223 return mutate(controller, loc, stub, req, reqConvert, (c, resp) -> { 224 return null; 225 }); 226 } 227 228 private static Result toResult(HBaseRpcController controller, MutateResponse resp) 229 throws IOException { 230 if (!resp.hasResult()) { 231 return null; 232 } 233 return ProtobufUtil.toResult(resp.getResult(), controller.cellScanner()); 234 } 235 236 @FunctionalInterface 237 private interface NoncedConverter<D, I, S> { 238 D convert(I info, S src, long nonceGroup, long nonce) throws IOException; 239 } 240 241 private <REQ, RESP> CompletableFuture<RESP> noncedMutate(long nonceGroup, long nonce, 242 HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req, 243 NoncedConverter<MutateRequest, byte[], REQ> reqConvert, 244 Converter<RESP, HBaseRpcController, MutateResponse> respConverter) { 245 return mutate(controller, loc, stub, req, 246 (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); 247 } 248 249 private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, long rpcTimeoutNs) { 250 return conn.callerFactory.<T> single().table(tableName).row(row).priority(priority) 251 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) 252 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 253 .pause(pauseNs, TimeUnit.NANOSECONDS) 254 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 255 .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes) 256 .startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes); 257 } 258 259 private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T> 260 newCaller(R row, long rpcTimeoutNs) { 261 return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); 262 } 263 264 private CompletableFuture<Result> get(Get get, int replicaId) { 265 return this.<Result, Get> newCaller(get, readRpcTimeoutNs) 266 .action((controller, loc, stub) -> RawAsyncTableImpl.<Get, GetRequest, GetResponse, 267 Result> call(controller, loc, stub, get, RequestConverter::buildGetRequest, 268 (s, c, req, done) -> s.get(c, req, done), 269 (c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner()))) 270 .replicaId(replicaId).call(); 271 } 272 273 private TableOperationSpanBuilder newTableOperationSpanBuilder() { 274 return new TableOperationSpanBuilder(conn).setTableName(tableName); 275 } 276 277 @Override 278 public CompletableFuture<Result> get(Get get) { 279 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(get); 280 return tracedFuture( 281 () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), 282 RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, 283 conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), 284 supplier); 285 } 286 287 @Override 288 public CompletableFuture<Void> put(Put put) { 289 validatePut(put, conn.connConf.getMaxKeyValueSize()); 290 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(put); 291 return tracedFuture(() -> this.<Void, Put> newCaller(put, writeRpcTimeoutNs) 292 .action((controller, loc, stub) -> RawAsyncTableImpl.<Put> voidMutate(controller, loc, stub, 293 put, RequestConverter::buildMutateRequest)) 294 .call(), supplier); 295 } 296 297 @Override 298 public CompletableFuture<Void> delete(Delete delete) { 299 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(delete); 300 return tracedFuture(() -> this.<Void, Delete> newCaller(delete, writeRpcTimeoutNs) 301 .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete> voidMutate(controller, loc, 302 stub, delete, RequestConverter::buildMutateRequest)) 303 .call(), supplier); 304 } 305 306 @Override 307 public CompletableFuture<Result> append(Append append) { 308 checkHasFamilies(append); 309 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(append); 310 return tracedFuture(() -> { 311 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 312 long nonce = conn.getNonceGenerator().newNonce(); 313 return this.<Result, Append> newCaller(append, rpcTimeoutNs) 314 .action((controller, loc, stub) -> this.<Append, Result> noncedMutate(nonceGroup, nonce, 315 controller, loc, stub, append, RequestConverter::buildMutateRequest, 316 RawAsyncTableImpl::toResult)) 317 .call(); 318 }, supplier); 319 } 320 321 @Override 322 public CompletableFuture<Result> increment(Increment increment) { 323 checkHasFamilies(increment); 324 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(increment); 325 return tracedFuture(() -> { 326 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 327 long nonce = conn.getNonceGenerator().newNonce(); 328 return this.<Result, Increment> newCaller(increment, rpcTimeoutNs) 329 .action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(nonceGroup, nonce, 330 controller, loc, stub, increment, RequestConverter::buildMutateRequest, 331 RawAsyncTableImpl::toResult)) 332 .call(); 333 }, supplier); 334 } 335 336 private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 337 338 private final byte[] row; 339 340 private final byte[] family; 341 342 private byte[] qualifier; 343 344 private TimeRange timeRange; 345 346 private CompareOperator op; 347 348 private byte[] value; 349 350 public CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 351 this.row = Preconditions.checkNotNull(row, "row is null"); 352 this.family = Preconditions.checkNotNull(family, "family is null"); 353 } 354 355 @Override 356 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 357 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 358 + " an empty byte array, or just do not call this method if you want a null qualifier"); 359 return this; 360 } 361 362 @Override 363 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 364 this.timeRange = timeRange; 365 return this; 366 } 367 368 @Override 369 public CheckAndMutateBuilder ifNotExists() { 370 this.op = CompareOperator.EQUAL; 371 this.value = null; 372 return this; 373 } 374 375 @Override 376 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 377 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 378 this.value = Preconditions.checkNotNull(value, "value is null"); 379 return this; 380 } 381 382 private void preCheck() { 383 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 384 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 385 } 386 387 @Override 388 public CompletableFuture<Boolean> thenPut(Put put) { 389 validatePut(put, conn.connConf.getMaxKeyValueSize()); 390 preCheck(); 391 final Supplier<Span> supplier = newTableOperationSpanBuilder() 392 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 393 .setContainerOperations(put); 394 return tracedFuture( 395 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 396 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 397 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 398 null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 399 (c, r) -> r.getProcessed())) 400 .call(), 401 supplier); 402 } 403 404 @Override 405 public CompletableFuture<Boolean> thenDelete(Delete delete) { 406 preCheck(); 407 final Supplier<Span> supplier = newTableOperationSpanBuilder() 408 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 409 .setContainerOperations(delete); 410 return tracedFuture( 411 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 412 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 413 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value, 414 null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 415 (c, r) -> r.getProcessed())) 416 .call(), 417 supplier); 418 } 419 420 @Override 421 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 422 preCheck(); 423 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 424 final Supplier<Span> supplier = newTableOperationSpanBuilder() 425 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 426 .setContainerOperations(mutations); 427 return tracedFuture(() -> RawAsyncTableImpl.this 428 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 429 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 430 mutations, 431 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, 432 null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 433 CheckAndMutateResult::isSuccess)) 434 .call(), supplier); 435 } 436 } 437 438 @Override 439 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 440 return new CheckAndMutateBuilderImpl(row, family); 441 } 442 443 private final class CheckAndMutateWithFilterBuilderImpl 444 implements CheckAndMutateWithFilterBuilder { 445 446 private final byte[] row; 447 448 private final Filter filter; 449 450 private TimeRange timeRange; 451 452 public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 453 this.row = Preconditions.checkNotNull(row, "row is null"); 454 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 455 } 456 457 @Override 458 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 459 this.timeRange = timeRange; 460 return this; 461 } 462 463 @Override 464 public CompletableFuture<Boolean> thenPut(Put put) { 465 validatePut(put, conn.connConf.getMaxKeyValueSize()); 466 final Supplier<Span> supplier = newTableOperationSpanBuilder() 467 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 468 .setContainerOperations(put); 469 return tracedFuture( 470 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs) 471 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, 472 (rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 473 timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), 474 (c, r) -> r.getProcessed())) 475 .call(), 476 supplier); 477 } 478 479 @Override 480 public CompletableFuture<Boolean> thenDelete(Delete delete) { 481 final Supplier<Span> supplier = newTableOperationSpanBuilder() 482 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 483 .setContainerOperations(delete); 484 return tracedFuture( 485 () -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs) 486 .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, 487 (rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null, filter, 488 timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), 489 (c, r) -> r.getProcessed())) 490 .call(), 491 supplier); 492 } 493 494 @Override 495 public CompletableFuture<Boolean> thenMutate(RowMutations mutations) { 496 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 497 final Supplier<Span> supplier = newTableOperationSpanBuilder() 498 .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 499 .setContainerOperations(mutations); 500 return tracedFuture(() -> RawAsyncTableImpl.this 501 .<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) 502 .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, 503 mutations, 504 (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, 505 timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), 506 CheckAndMutateResult::isSuccess)) 507 .call(), supplier); 508 } 509 } 510 511 @Override 512 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 513 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 514 } 515 516 @Override 517 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 518 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutate) 519 .setContainerOperations(checkAndMutate.getAction()); 520 return tracedFuture(() -> { 521 if ( 522 checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete 523 || checkAndMutate.getAction() instanceof Increment 524 || checkAndMutate.getAction() instanceof Append 525 ) { 526 Mutation mutation = (Mutation) checkAndMutate.getAction(); 527 if (mutation instanceof Put) { 528 validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize()); 529 } 530 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 531 long nonce = conn.getNonceGenerator().newNonce(); 532 return RawAsyncTableImpl.this 533 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), mutation.getPriority(), 534 rpcTimeoutNs) 535 .action( 536 (controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, mutation, 537 (rn, m) -> RequestConverter.buildMutateRequest(rn, checkAndMutate.getRow(), 538 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 539 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 540 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), m, nonceGroup, nonce), 541 (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner()))) 542 .call(); 543 } else if (checkAndMutate.getAction() instanceof RowMutations) { 544 RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); 545 validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); 546 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 547 long nonce = conn.getNonceGenerator().newNonce(); 548 return RawAsyncTableImpl.this 549 .<CheckAndMutateResult> newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), 550 rpcTimeoutNs) 551 .action((controller, loc, stub) -> RawAsyncTableImpl.this.<CheckAndMutateResult, 552 CheckAndMutateResult> mutateRow(controller, loc, stub, rowMutations, 553 (rn, rm) -> RequestConverter.buildMultiRequest(rn, checkAndMutate.getRow(), 554 checkAndMutate.getFamily(), checkAndMutate.getQualifier(), 555 checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 556 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), rm, nonceGroup, nonce), 557 resp -> resp)) 558 .call(); 559 } else { 560 CompletableFuture<CheckAndMutateResult> future = new CompletableFuture<>(); 561 future.completeExceptionally(new DoNotRetryIOException( 562 "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); 563 return future; 564 } 565 }, supplier); 566 } 567 568 @Override 569 public List<CompletableFuture<CheckAndMutateResult>> 570 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 571 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(checkAndMutates) 572 .setContainerOperations(checkAndMutates); 573 return tracedFutures(() -> batch(checkAndMutates, rpcTimeoutNs).stream() 574 .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), supplier); 575 } 576 577 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, 578 // so here I write a new method as I do not want to change the abstraction of call method. 579 @SuppressWarnings("unchecked") 580 private <RES, RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller, 581 HRegionLocation loc, ClientService.Interface stub, RowMutations mutation, 582 Converter<MultiRequest, byte[], RowMutations> reqConvert, Function<RES, RESP> respConverter) { 583 CompletableFuture<RESP> future = new CompletableFuture<>(); 584 try { 585 byte[] regionName = loc.getRegion().getRegionName(); 586 MultiRequest req = reqConvert.convert(regionName, mutation); 587 stub.multi(controller, req, new RpcCallback<MultiResponse>() { 588 589 @Override 590 public void run(MultiResponse resp) { 591 if (controller.failed()) { 592 future.completeExceptionally(controller.getFailed()); 593 } else { 594 try { 595 org.apache.hadoop.hbase.client.MultiResponse multiResp = 596 ResponseConverter.getResults(req, resp, controller.cellScanner()); 597 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(), 598 loc.getServerName(), multiResp); 599 Throwable ex = multiResp.getException(regionName); 600 if (ex != null) { 601 future.completeExceptionally(ex instanceof IOException 602 ? ex 603 : new IOException( 604 "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex)); 605 } else { 606 future.complete( 607 respConverter.apply((RES) multiResp.getResults().get(regionName).result.get(0))); 608 } 609 } catch (IOException e) { 610 future.completeExceptionally(e); 611 } 612 } 613 } 614 }); 615 } catch (IOException e) { 616 future.completeExceptionally(e); 617 } 618 return future; 619 } 620 621 @Override 622 public CompletableFuture<Result> mutateRow(RowMutations mutations) { 623 validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); 624 long nonceGroup = conn.getNonceGenerator().getNonceGroup(); 625 long nonce = conn.getNonceGenerator().newNonce(); 626 final Supplier<Span> supplier = 627 newTableOperationSpanBuilder().setOperation(mutations).setContainerOperations(mutations); 628 return tracedFuture( 629 () -> this 630 .<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) 631 .action((controller, loc, stub) -> this.<Result, Result> mutateRow(controller, loc, stub, 632 mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), 633 resp -> resp)) 634 .call(), 635 supplier); 636 } 637 638 private Scan setDefaultScanConfig(Scan scan) { 639 // always create a new scan object as we may reset the start row later. 640 Scan newScan = ReflectionUtils.newInstance(scan.getClass(), scan); 641 if (newScan.getCaching() <= 0) { 642 newScan.setCaching(defaultScannerCaching); 643 } 644 if (newScan.getMaxResultSize() <= 0) { 645 newScan.setMaxResultSize(defaultScannerMaxResultSize); 646 } 647 return newScan; 648 } 649 650 @Override 651 public void scan(Scan scan, AdvancedScanResultConsumer consumer) { 652 new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, 653 pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, 654 startLogErrorsCnt, requestAttributes).start(); 655 } 656 657 private long resultSize2CacheSize(long maxResultSize) { 658 // * 2 if possible 659 return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2; 660 } 661 662 @Override 663 public AsyncTableResultScanner getScanner(Scan scan) { 664 final long maxCacheSize = resultSize2CacheSize( 665 scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize); 666 final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan); 667 final AsyncTableResultScanner scanner = 668 new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize); 669 scan(scan, scanner); 670 return scanner; 671 } 672 673 @Override 674 public CompletableFuture<List<Result>> scanAll(Scan scan) { 675 CompletableFuture<List<Result>> future = new CompletableFuture<>(); 676 List<Result> scanResults = new ArrayList<>(); 677 scan(scan, new AdvancedScanResultConsumer() { 678 679 @Override 680 public void onNext(Result[] results, ScanController controller) { 681 scanResults.addAll(Arrays.asList(results)); 682 } 683 684 @Override 685 public void onError(Throwable error) { 686 future.completeExceptionally(error); 687 } 688 689 @Override 690 public void onComplete() { 691 future.complete(scanResults); 692 } 693 }); 694 return future; 695 } 696 697 @Override 698 public List<CompletableFuture<Result>> get(List<Get> gets) { 699 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(gets) 700 .setContainerOperations(HBaseSemanticAttributes.Operation.GET); 701 return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); 702 } 703 704 @Override 705 public List<CompletableFuture<Void>> put(List<Put> puts) { 706 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(puts) 707 .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); 708 return tracedFutures(() -> voidMutate(puts), supplier); 709 } 710 711 @Override 712 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 713 final Supplier<Span> supplier = newTableOperationSpanBuilder().setOperation(deletes) 714 .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); 715 return tracedFutures(() -> voidMutate(deletes), supplier); 716 } 717 718 @Override 719 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 720 final Supplier<Span> supplier = 721 newTableOperationSpanBuilder().setOperation(actions).setContainerOperations(actions); 722 return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); 723 } 724 725 private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) { 726 return this.<Object> batch(actions, writeRpcTimeoutNs).stream() 727 .map(f -> f.<Void> thenApply(r -> null)).collect(toList()); 728 } 729 730 private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) { 731 for (Row action : actions) { 732 if (action instanceof Put) { 733 validatePut((Put) action, conn.connConf.getMaxKeyValueSize()); 734 } else if (action instanceof CheckAndMutate) { 735 CheckAndMutate checkAndMutate = (CheckAndMutate) action; 736 if (checkAndMutate.getAction() instanceof Put) { 737 validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); 738 } else if (checkAndMutate.getAction() instanceof RowMutations) { 739 validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), 740 conn.connConf.getMaxKeyValueSize()); 741 } 742 } else if (action instanceof RowMutations) { 743 validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); 744 } 745 } 746 return conn.callerFactory.batch().table(tableName).actions(actions) 747 .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) 748 .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) 749 .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) 750 .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) 751 .setRequestAttributes(requestAttributes).call(); 752 } 753 754 @Override 755 public long getRpcTimeout(TimeUnit unit) { 756 return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); 757 } 758 759 @Override 760 public long getReadRpcTimeout(TimeUnit unit) { 761 return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); 762 } 763 764 @Override 765 public long getWriteRpcTimeout(TimeUnit unit) { 766 return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); 767 } 768 769 @Override 770 public long getOperationTimeout(TimeUnit unit) { 771 return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); 772 } 773 774 @Override 775 public long getScanTimeout(TimeUnit unit) { 776 return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); 777 } 778 779 @Override 780 public Map<String, byte[]> getRequestAttributes() { 781 return requestAttributes; 782 } 783 784 private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 785 ServiceCaller<S, R> callable, RegionInfo region, byte[] row) { 786 RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, 787 region, row, rpcTimeoutNs, operationTimeoutNs); 788 final Span span = Span.current(); 789 S stub = stubMaker.apply(channel); 790 CompletableFuture<R> future = new CompletableFuture<>(); 791 ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); 792 callable.call(stub, controller, resp -> { 793 try (Scope ignored = span.makeCurrent()) { 794 if (controller.failed()) { 795 final Throwable failure = controller.getFailed(); 796 future.completeExceptionally(failure); 797 TraceUtil.setError(span, failure); 798 } else { 799 future.complete(resp); 800 span.setStatus(StatusCode.OK); 801 } 802 } finally { 803 span.end(); 804 } 805 }); 806 return future; 807 } 808 809 @Override 810 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 811 ServiceCaller<S, R> callable, byte[] row) { 812 return coprocessorService(stubMaker, callable, null, row); 813 } 814 815 private boolean locateFinished(RegionInfo region, byte[] endKey, boolean endKeyInclusive) { 816 if (isEmptyStopRow(endKey)) { 817 if (isEmptyStopRow(region.getEndKey())) { 818 return true; 819 } 820 return false; 821 } else { 822 if (isEmptyStopRow(region.getEndKey())) { 823 return true; 824 } 825 int c = Bytes.compareTo(endKey, region.getEndKey()); 826 // 1. if the region contains endKey 827 // 2. endKey is equal to the region's endKey and we do not want to include endKey. 828 return c < 0 || (c == 0 && !endKeyInclusive); 829 } 830 } 831 832 private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker, 833 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs, 834 byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished, 835 AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) { 836 final Span span = Span.current(); 837 if (error != null) { 838 callback.onError(error); 839 TraceUtil.setError(span, error); 840 span.end(); 841 return; 842 } 843 unfinishedRequest.incrementAndGet(); 844 RegionInfo region = loc.getRegion(); 845 if (locateFinished(region, endKey, endKeyInclusive)) { 846 locateFinished.set(true); 847 } else { 848 addListener(conn.getLocator().getRegionLocation(tableName, region.getEndKey(), 849 RegionLocateType.CURRENT, operationTimeoutNs), (l, e) -> { 850 try (Scope ignored = span.makeCurrent()) { 851 onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive, 852 locateFinished, unfinishedRequest, l, e); 853 } 854 }); 855 } 856 addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> { 857 try (Scope ignored = span.makeCurrent()) { 858 if (e != null) { 859 callback.onRegionError(region, e); 860 } else { 861 callback.onRegionComplete(region, r); 862 } 863 if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { 864 callback.onComplete(); 865 } 866 } 867 }); 868 } 869 870 private final class CoprocessorServiceBuilderImpl<S, R> 871 implements CoprocessorServiceBuilder<S, R> { 872 873 private final Function<RpcChannel, S> stubMaker; 874 875 private final ServiceCaller<S, R> callable; 876 877 private final CoprocessorCallback<R> callback; 878 879 private byte[] startKey = HConstants.EMPTY_START_ROW; 880 881 private boolean startKeyInclusive; 882 883 private byte[] endKey = HConstants.EMPTY_END_ROW; 884 885 private boolean endKeyInclusive; 886 887 public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker, 888 ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) { 889 this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null"); 890 this.callable = Preconditions.checkNotNull(callable, "callable is null"); 891 this.callback = Preconditions.checkNotNull(callback, "callback is null"); 892 } 893 894 @Override 895 public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) { 896 this.startKey = Preconditions.checkNotNull(startKey, 897 "startKey is null. Consider using" 898 + " an empty byte array, or just do not call this method if you want to start selection" 899 + " from the first region"); 900 this.startKeyInclusive = inclusive; 901 return this; 902 } 903 904 @Override 905 public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) { 906 this.endKey = Preconditions.checkNotNull(endKey, 907 "endKey is null. Consider using" 908 + " an empty byte array, or just do not call this method if you want to continue" 909 + " selection to the last region"); 910 this.endKeyInclusive = inclusive; 911 return this; 912 } 913 914 @Override 915 public void execute() { 916 final Span span = newTableOperationSpanBuilder() 917 .setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC).build(); 918 try (Scope ignored = span.makeCurrent()) { 919 final RegionLocateType regionLocateType = 920 startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER; 921 final CompletableFuture<HRegionLocation> future = conn.getLocator() 922 .getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs); 923 addListener(future, (loc, error) -> { 924 try (Scope ignored1 = span.makeCurrent()) { 925 onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey, 926 endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error); 927 } 928 }); 929 } 930 } 931 } 932 933 @Override 934 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 935 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 936 CoprocessorCallback<R> callback) { 937 return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback); 938 } 939}