001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. 021// Internally, we use shaded protobuf. This below are part of our public API. 022// SEE ABOVE NOTE! 023 024import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; 025 026import com.google.protobuf.Descriptors; 027import com.google.protobuf.Message; 028import com.google.protobuf.Service; 029import com.google.protobuf.ServiceException; 030import io.opentelemetry.api.trace.Span; 031import io.opentelemetry.api.trace.StatusCode; 032import io.opentelemetry.context.Context; 033import io.opentelemetry.context.Scope; 034import java.io.IOException; 035import java.io.InterruptedIOException; 036import java.util.ArrayList; 037import java.util.Collections; 038import java.util.List; 039import java.util.Map; 040import java.util.Optional; 041import java.util.TreeMap; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.ExecutorService; 044import java.util.concurrent.Future; 045import java.util.concurrent.SynchronousQueue; 046import java.util.concurrent.ThreadPoolExecutor; 047import java.util.concurrent.TimeUnit; 048import java.util.function.Supplier; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.CompareOperator; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.HTableDescriptor; 054import org.apache.hadoop.hbase.TableName; 055import org.apache.hadoop.hbase.client.coprocessor.Batch; 056import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 057import org.apache.hadoop.hbase.client.trace.TableSpanBuilder; 058import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 059import org.apache.hadoop.hbase.filter.Filter; 060import org.apache.hadoop.hbase.io.TimeRange; 061import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 062import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 063import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 064import org.apache.hadoop.hbase.trace.TraceUtil; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.Pair; 067import org.apache.hadoop.hbase.util.ReflectionUtils; 068import org.apache.hadoop.hbase.util.Threads; 069import org.apache.yetus.audience.InterfaceAudience; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 075 076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 077import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 078import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 083 084/** 085 * An implementation of {@link Table}. Used to communicate with a single HBase table. Lightweight. 086 * Get as needed and just close when done. Instances of this class SHOULD NOT be constructed 087 * directly. Obtain an instance via {@link Connection}. See {@link ConnectionFactory} class comment 088 * for an example of how. 089 * <p> 090 * This class is thread safe since 2.0.0 if not invoking any of the setter methods. All setters are 091 * moved into {@link TableBuilder} and reserved here only for keeping backward compatibility, and 092 * TODO will be removed soon. 093 * <p> 094 * HTable is no longer a client API. Use {@link Table} instead. It is marked 095 * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in <a href= 096 * "https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/InterfaceClassification.html">Hadoop 097 * Interface Classification</a> There are no guarantees for backwards source / binary compatibility 098 * and methods or class can change or go away without deprecation. 099 * @see Table 100 * @see Admin 101 * @see Connection 102 * @see ConnectionFactory 103 */ 104@InterfaceAudience.Private 105public class HTable implements Table { 106 private static final Logger LOG = LoggerFactory.getLogger(HTable.class); 107 private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG; 108 private final ClusterConnection connection; 109 private final TableName tableName; 110 private final Configuration configuration; 111 private final ConnectionConfiguration connConfiguration; 112 private boolean closed = false; 113 private final int scannerCaching; 114 private final long scannerMaxResultSize; 115 private final ExecutorService pool; // For Multi & Scan 116 private int operationTimeoutMs; // global timeout for each blocking method with retrying rpc 117 private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX 118 private int readRpcTimeoutMs; // timeout for each read rpc request 119 private int writeRpcTimeoutMs; // timeout for each write rpc request 120 121 private final int scanReadRpcTimeout; 122 private final int scanTimeout; 123 private final boolean cleanupPoolOnClose; // shutdown the pool in close() 124 private final HRegionLocator locator; 125 126 /** The Async process for batch */ 127 AsyncProcess multiAp; 128 private final RpcRetryingCallerFactory rpcCallerFactory; 129 private final RpcControllerFactory rpcControllerFactory; 130 131 private final Map<String, byte[]> requestAttributes; 132 133 // Marked Private @since 1.0 134 @InterfaceAudience.Private 135 public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) { 136 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); 137 if (maxThreads == 0) { 138 maxThreads = 1; // is there a better default? 139 } 140 int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1); 141 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); 142 143 // Using the "direct handoff" approach, new threads will only be created 144 // if it is necessary and will grow unbounded. This could be bad but in HCM 145 // we only create as many Runnables as there are region servers. It means 146 // it also scales when new region servers are added. 147 ThreadPoolExecutor pool = 148 new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, TimeUnit.SECONDS, 149 new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("htable-pool-%d") 150 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 151 pool.allowCoreThreadTimeOut(true); 152 return pool; 153 } 154 155 /** 156 * Creates an object to access a HBase table. Used by HBase internally. DO NOT USE. See 157 * {@link ConnectionFactory} class comment for how to get a {@link Table} instance (use 158 * {@link Table} instead of {@link HTable}). 159 * @param connection Connection to be used. 160 * @param builder The table builder 161 * @param rpcCallerFactory The RPC caller factory 162 * @param rpcControllerFactory The RPC controller factory 163 * @param pool ExecutorService to be used. 164 */ 165 @InterfaceAudience.Private 166 protected HTable(final ConnectionImplementation connection, final TableBuilderBase builder, 167 final RpcRetryingCallerFactory rpcCallerFactory, 168 final RpcControllerFactory rpcControllerFactory, final ExecutorService pool, 169 final Map<String, byte[]> requestAttributes) { 170 this.connection = Preconditions.checkNotNull(connection, "connection is null"); 171 this.configuration = connection.getConfiguration(); 172 this.connConfiguration = connection.getConnectionConfiguration(); 173 if (pool == null) { 174 this.pool = getDefaultExecutor(this.configuration); 175 this.cleanupPoolOnClose = true; 176 } else { 177 this.pool = pool; 178 this.cleanupPoolOnClose = false; 179 } 180 if (rpcCallerFactory == null) { 181 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); 182 } else { 183 this.rpcCallerFactory = rpcCallerFactory; 184 } 185 186 if (rpcControllerFactory == null) { 187 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); 188 } else { 189 this.rpcControllerFactory = rpcControllerFactory; 190 } 191 192 this.tableName = builder.tableName; 193 this.operationTimeoutMs = builder.operationTimeout; 194 this.rpcTimeoutMs = builder.rpcTimeout; 195 this.readRpcTimeoutMs = builder.readRpcTimeout; 196 this.writeRpcTimeoutMs = builder.writeRpcTimeout; 197 this.scanReadRpcTimeout = builder.scanReadRpcTimeout; 198 this.scanTimeout = builder.scanTimeout; 199 this.scannerCaching = connConfiguration.getScannerCaching(); 200 this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); 201 this.requestAttributes = requestAttributes; 202 203 // puts need to track errors globally due to how the APIs currently work. 204 multiAp = this.connection.getAsyncProcess(); 205 this.locator = new HRegionLocator(tableName, connection); 206 } 207 208 /** Returns maxKeyValueSize from configuration. */ 209 public static int getMaxKeyValueSize(Configuration conf) { 210 return conf.getInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, -1); 211 } 212 213 @Override 214 public Configuration getConfiguration() { 215 return configuration; 216 } 217 218 @Override 219 public TableName getName() { 220 return tableName; 221 } 222 223 /** 224 * <em>INTERNAL</em> Used by unit tests and tools to do low-level manipulations. 225 * @return A Connection instance. 226 */ 227 protected Connection getConnection() { 228 return this.connection; 229 } 230 231 @Override 232 @Deprecated 233 public HTableDescriptor getTableDescriptor() throws IOException { 234 HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory, 235 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 236 if (htd != null) { 237 return new ImmutableHTableDescriptor(htd); 238 } 239 return null; 240 } 241 242 @Override 243 public TableDescriptor getDescriptor() throws IOException { 244 return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, 245 rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs); 246 } 247 248 /** 249 * Get the corresponding start keys and regions for an arbitrary range of keys. 250 * <p> 251 * @param startKey Starting row in range, inclusive 252 * @param endKey Ending row in range 253 * @param includeEndKey true if endRow is inclusive, false if exclusive 254 * @return A pair of list of start keys and list of HRegionLocations that contain the specified 255 * range 256 * @throws IOException if a remote or network exception occurs 257 */ 258 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey, 259 final byte[] endKey, final boolean includeEndKey) throws IOException { 260 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); 261 } 262 263 /** 264 * Get the corresponding start keys and regions for an arbitrary range of keys. 265 * <p> 266 * @param startKey Starting row in range, inclusive 267 * @param endKey Ending row in range 268 * @param includeEndKey true if endRow is inclusive, false if exclusive 269 * @param reload true to reload information or false to use cached information 270 * @return A pair of list of start keys and list of HRegionLocations that contain the specified 271 * range 272 * @throws IOException if a remote or network exception occurs 273 */ 274 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey, 275 final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException { 276 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); 277 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { 278 throw new IllegalArgumentException( 279 "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey)); 280 } 281 List<byte[]> keysInRange = new ArrayList<>(); 282 List<HRegionLocation> regionsInRange = new ArrayList<>(); 283 byte[] currentKey = startKey; 284 do { 285 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); 286 keysInRange.add(currentKey); 287 regionsInRange.add(regionLocation); 288 currentKey = regionLocation.getRegionInfo().getEndKey(); 289 } while ( 290 !Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) 291 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 292 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)) 293 ); 294 return new Pair<>(keysInRange, regionsInRange); 295 } 296 297 /** 298 * The underlying {@link HTable} must not be closed. {@link Table#getScanner(Scan)} has other 299 * usage details. 300 */ 301 @Override 302 public ResultScanner getScanner(Scan scan) throws IOException { 303 // Clone to avoid modifying user object from scan internals. 304 // See https://issues.apache.org/jira/browse/HBASE-27402. 305 return getScannerInternal(new Scan(scan), scan); 306 } 307 308 private ResultScanner getScannerInternal(Scan scan, Scan scanForMetrics) throws IOException { 309 final Span span = 310 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(scan).build(); 311 try (Scope ignored = span.makeCurrent()) { 312 if (scan.getCaching() <= 0) { 313 scan.setCaching(scannerCaching); 314 } 315 if (scan.getMaxResultSize() <= 0) { 316 scan.setMaxResultSize(scannerMaxResultSize); 317 } 318 if (scan.getMvccReadPoint() > 0) { 319 // it is not supposed to be set by user, clear 320 scan.resetMvccReadPoint(); 321 } 322 final boolean async = scan.isAsyncPrefetch() != null 323 ? scan.isAsyncPrefetch() 324 : connConfiguration.isClientScannerAsyncPrefetch(); 325 final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); 326 327 if (scan.isReversed()) { 328 return new ReversedClientScanner(getConfiguration(), scan, scanForMetrics, getName(), 329 connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, 330 replicaTimeout, connConfiguration, requestAttributes); 331 } else { 332 if (async) { 333 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, scanForMetrics, getName(), 334 connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 335 scanTimeout, replicaTimeout, connConfiguration, requestAttributes); 336 } else { 337 return new ClientSimpleScanner(getConfiguration(), scan, scanForMetrics, getName(), 338 connection, rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 339 scanTimeout, replicaTimeout, connConfiguration, requestAttributes); 340 } 341 } 342 } 343 } 344 345 /** 346 * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[])} has other 347 * usage details. 348 */ 349 @Override 350 public ResultScanner getScanner(byte[] family) throws IOException { 351 Scan scan = new Scan(); 352 scan.addFamily(family); 353 return getScannerInternal(scan, scan); 354 } 355 356 /** 357 * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[], byte[])} has 358 * other usage details. 359 */ 360 @Override 361 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { 362 Scan scan = new Scan(); 363 scan.addColumn(family, qualifier); 364 return getScannerInternal(scan, scan); 365 } 366 367 @Override 368 public Result get(final Get get) throws IOException { 369 final Supplier<Span> supplier = 370 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get); 371 return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier); 372 } 373 374 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { 375 // if we are changing settings to the get, clone it. 376 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { 377 get = ReflectionUtils.newInstance(get.getClass(), get); 378 get.setCheckExistenceOnly(checkExistenceOnly); 379 if (get.getConsistency() == null) { 380 get.setConsistency(DEFAULT_CONSISTENCY); 381 } 382 } 383 384 if (get.getConsistency() == Consistency.STRONG) { 385 final Get configuredGet = get; 386 ClientServiceCallable<Result> callable = 387 new ClientServiceCallable<Result>(this.connection, getName(), get.getRow(), 388 this.rpcControllerFactory.newController(), get.getPriority(), requestAttributes) { 389 @Override 390 protected Result rpcCall() throws Exception { 391 ClientProtos.GetRequest request = RequestConverter 392 .buildGetRequest(getLocation().getRegionInfo().getRegionName(), configuredGet); 393 ClientProtos.GetResponse response = doGet(request); 394 return response == null 395 ? null 396 : ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 397 } 398 }; 399 return rpcCallerFactory.<Result> newCaller(readRpcTimeoutMs).callWithRetries(callable, 400 this.operationTimeoutMs); 401 } 402 403 // Call that takes into account the replica 404 RpcRetryingCallerWithReadReplicas callable = 405 new RpcRetryingCallerWithReadReplicas(rpcControllerFactory, tableName, this.connection, get, 406 pool, connConfiguration.getRetriesNumber(), operationTimeoutMs, readRpcTimeoutMs, 407 connConfiguration.getPrimaryCallTimeoutMicroSecond(), requestAttributes); 408 return callable.call(operationTimeoutMs); 409 } 410 411 @Override 412 public Result[] get(List<Get> gets) throws IOException { 413 final Supplier<Span> supplier = 414 new TableOperationSpanBuilder(connection).setTableName(tableName) 415 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets); 416 return TraceUtil.trace(() -> { 417 if (gets.size() == 1) { 418 return new Result[] { get(gets.get(0)) }; 419 } 420 try { 421 Object[] r1 = new Object[gets.size()]; 422 batch((List<? extends Row>) gets, r1, readRpcTimeoutMs); 423 // Translate. 424 Result[] results = new Result[r1.length]; 425 int i = 0; 426 for (Object obj : r1) { 427 // Batch ensures if there is a failure we get an exception instead 428 results[i++] = (Result) obj; 429 } 430 return results; 431 } catch (InterruptedException e) { 432 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 433 } 434 }, supplier); 435 } 436 437 @Override 438 public void batch(final List<? extends Row> actions, final Object[] results) 439 throws InterruptedException, IOException { 440 int rpcTimeout = writeRpcTimeoutMs; 441 boolean hasRead = false; 442 boolean hasWrite = false; 443 for (Row action : actions) { 444 if (action instanceof Mutation) { 445 hasWrite = true; 446 } else { 447 hasRead = true; 448 } 449 if (hasRead && hasWrite) { 450 break; 451 } 452 } 453 if (hasRead && !hasWrite) { 454 rpcTimeout = readRpcTimeoutMs; 455 } 456 try { 457 batch(actions, results, rpcTimeout); 458 } catch (InterruptedException e) { 459 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 460 } 461 } 462 463 public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) 464 throws InterruptedException, IOException { 465 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName) 466 .setRowAccess(actions).setResults(results).setRpcTimeout(rpcTimeout) 467 .setOperationTimeout(operationTimeoutMs).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 468 .setRequestAttributes(requestAttributes).build(); 469 final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName) 470 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions) 471 .build(); 472 try (Scope ignored = span.makeCurrent()) { 473 AsyncRequestFuture ars = multiAp.submit(task); 474 ars.waitUntilDone(); 475 if (ars.hasError()) { 476 TraceUtil.setError(span, ars.getErrors()); 477 throw ars.getErrors(); 478 } 479 span.setStatus(StatusCode.OK); 480 } finally { 481 span.end(); 482 } 483 } 484 485 @Override 486 public <R> void batchCallback(final List<? extends Row> actions, final Object[] results, 487 final Batch.Callback<R> callback) throws IOException, InterruptedException { 488 doBatchWithCallback(actions, results, callback, connection, pool, tableName, requestAttributes); 489 } 490 491 public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results, 492 Batch.Callback<R> callback, ClusterConnection connection, ExecutorService pool, 493 TableName tableName, Map<String, byte[]> requestAttributes) 494 throws InterruptedIOException, RetriesExhaustedWithDetailsException { 495 int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout(); 496 int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 497 connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 498 HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 499 AsyncProcessTask<R> task = 500 AsyncProcessTask.newBuilder(callback).setPool(pool).setTableName(tableName) 501 .setRowAccess(actions).setResults(results).setOperationTimeout(operationTimeout) 502 .setRpcTimeout(writeTimeout).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 503 .setRequestAttributes(requestAttributes).build(); 504 final Span span = new TableOperationSpanBuilder(connection).setTableName(tableName) 505 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(actions) 506 .build(); 507 try (Scope ignored = span.makeCurrent()) { 508 AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); 509 ars.waitUntilDone(); 510 if (ars.hasError()) { 511 TraceUtil.setError(span, ars.getErrors()); 512 throw ars.getErrors(); 513 } 514 } finally { 515 span.end(); 516 } 517 } 518 519 @Override 520 public void delete(final Delete delete) throws IOException { 521 final Supplier<Span> supplier = 522 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(delete); 523 TraceUtil.trace(() -> { 524 ClientServiceCallable<Void> callable = 525 new ClientServiceCallable<Void>(this.connection, getName(), delete.getRow(), 526 this.rpcControllerFactory.newController(), delete.getPriority(), requestAttributes) { 527 @Override 528 protected Void rpcCall() throws Exception { 529 MutateRequest request = RequestConverter 530 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), delete); 531 doMutate(request); 532 return null; 533 } 534 }; 535 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 536 this.operationTimeoutMs); 537 }, supplier); 538 } 539 540 @Override 541 public void delete(final List<Delete> deletes) throws IOException { 542 Object[] results = new Object[deletes.size()]; 543 try { 544 batch(deletes, results, writeRpcTimeoutMs); 545 } catch (InterruptedException e) { 546 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 547 } finally { 548 // TODO: to be consistent with batch put(), do not modify input list 549 // mutate list so that it is empty for complete success, or contains only failed records 550 // results are returned in the same order as the requests in list walk the list backwards, 551 // so we can remove from list without impacting the indexes of earlier members 552 for (int i = results.length - 1; i >= 0; i--) { 553 // if result is not null, it succeeded 554 if (results[i] instanceof Result) { 555 deletes.remove(i); 556 } 557 } 558 } 559 } 560 561 @Override 562 public void put(final Put put) throws IOException { 563 final Supplier<Span> supplier = 564 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(put); 565 TraceUtil.trace(() -> { 566 validatePut(put); 567 ClientServiceCallable<Void> callable = 568 new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), 569 this.rpcControllerFactory.newController(), put.getPriority(), requestAttributes) { 570 @Override 571 protected Void rpcCall() throws Exception { 572 MutateRequest request = RequestConverter 573 .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); 574 doMutate(request); 575 return null; 576 } 577 }; 578 rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 579 this.operationTimeoutMs); 580 }, supplier); 581 } 582 583 @Override 584 public void put(final List<Put> puts) throws IOException { 585 for (Put put : puts) { 586 validatePut(put); 587 } 588 Object[] results = new Object[puts.size()]; 589 try { 590 batch(puts, results, writeRpcTimeoutMs); 591 } catch (InterruptedException e) { 592 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 593 } 594 } 595 596 @Override 597 public Result mutateRow(final RowMutations rm) throws IOException { 598 final Supplier<Span> supplier = 599 new TableOperationSpanBuilder(connection).setTableName(tableName) 600 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(rm); 601 return TraceUtil.trace(() -> { 602 long nonceGroup = getNonceGroup(); 603 long nonce = getNonce(); 604 CancellableRegionServerCallable<MultiResponse> callable = 605 new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), 606 rpcControllerFactory.newController(), writeRpcTimeoutMs, 607 new RetryingTimeTracker().start(), rm.getMaxPriority(), requestAttributes) { 608 @Override 609 protected MultiResponse rpcCall() throws Exception { 610 MultiRequest request = RequestConverter.buildMultiRequest( 611 getLocation().getRegionInfo().getRegionName(), rm, nonceGroup, nonce); 612 ClientProtos.MultiResponse response = doMulti(request); 613 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 614 if (res.hasException()) { 615 Throwable ex = ProtobufUtil.toException(res.getException()); 616 if (ex instanceof IOException) { 617 throw (IOException) ex; 618 } 619 throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), 620 ex); 621 } 622 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 623 } 624 }; 625 Object[] results = new Object[rm.getMutations().size()]; 626 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName) 627 .setRowAccess(rm.getMutations()).setCallable(callable).setRpcTimeout(writeRpcTimeoutMs) 628 .setOperationTimeout(operationTimeoutMs) 629 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results) 630 .setRequestAttributes(requestAttributes).build(); 631 AsyncRequestFuture ars = multiAp.submit(task); 632 ars.waitUntilDone(); 633 if (ars.hasError()) { 634 throw ars.getErrors(); 635 } 636 return (Result) results[0]; 637 }, supplier); 638 } 639 640 private long getNonceGroup() { 641 return ((ClusterConnection) getConnection()).getNonceGenerator().getNonceGroup(); 642 } 643 644 private long getNonce() { 645 return ((ClusterConnection) getConnection()).getNonceGenerator().newNonce(); 646 } 647 648 @Override 649 public Result append(final Append append) throws IOException { 650 final Supplier<Span> supplier = 651 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(append); 652 return TraceUtil.trace(() -> { 653 checkHasFamilies(append); 654 NoncedRegionServerCallable<Result> callable = 655 new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(), 656 this.rpcControllerFactory.newController(), append.getPriority(), requestAttributes) { 657 @Override 658 protected Result rpcCall() throws Exception { 659 MutateRequest request = 660 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), 661 append, super.getNonceGroup(), super.getNonce()); 662 MutateResponse response = doMutate(request); 663 if (!response.hasResult()) { 664 return null; 665 } 666 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 667 } 668 }; 669 return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 670 this.operationTimeoutMs); 671 }, supplier); 672 } 673 674 @Override 675 public Result increment(final Increment increment) throws IOException { 676 final Supplier<Span> supplier = 677 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(increment); 678 return TraceUtil.trace(() -> { 679 checkHasFamilies(increment); 680 NoncedRegionServerCallable<Result> callable = 681 new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(), 682 this.rpcControllerFactory.newController(), increment.getPriority(), requestAttributes) { 683 @Override 684 protected Result rpcCall() throws Exception { 685 MutateRequest request = 686 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), 687 increment, super.getNonceGroup(), super.getNonce()); 688 MutateResponse response = doMutate(request); 689 // Should this check for null like append does? 690 return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 691 } 692 }; 693 return rpcCallerFactory.<Result> newCaller(writeRpcTimeoutMs).callWithRetries(callable, 694 this.operationTimeoutMs); 695 }, supplier); 696 } 697 698 @Override 699 public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, 700 final long amount) throws IOException { 701 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); 702 } 703 704 @Override 705 public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, 706 final long amount, final Durability durability) throws IOException { 707 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 708 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.INCREMENT); 709 return TraceUtil.trace(() -> { 710 NullPointerException npe = null; 711 if (row == null) { 712 npe = new NullPointerException("row is null"); 713 } else if (family == null) { 714 npe = new NullPointerException("family is null"); 715 } 716 if (npe != null) { 717 throw new IOException("Invalid arguments to incrementColumnValue", npe); 718 } 719 720 NoncedRegionServerCallable<Long> callable = 721 new NoncedRegionServerCallable<Long>(this.connection, getName(), row, 722 this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET, requestAttributes) { 723 @Override 724 protected Long rpcCall() throws Exception { 725 MutateRequest request = RequestConverter.buildIncrementRequest( 726 getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount, 727 durability, super.getNonceGroup(), super.getNonce()); 728 MutateResponse response = doMutate(request); 729 Result result = 730 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); 731 return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); 732 } 733 }; 734 return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, 735 this.operationTimeoutMs); 736 }, supplier); 737 } 738 739 @Override 740 @Deprecated 741 public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 742 final byte[] value, final Put put) throws IOException { 743 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 744 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 745 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 746 HBaseSemanticAttributes.Operation.PUT); 747 return TraceUtil.trace( 748 () -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put) 749 .isSuccess(), 750 supplier); 751 } 752 753 @Override 754 @Deprecated 755 public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 756 final CompareOp compareOp, final byte[] value, final Put put) throws IOException { 757 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 758 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 759 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 760 HBaseSemanticAttributes.Operation.PUT); 761 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, 762 toCompareOperator(compareOp), value, null, null, put).isSuccess(), supplier); 763 } 764 765 @Override 766 @Deprecated 767 public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, 768 final CompareOperator op, final byte[] value, final Put put) throws IOException { 769 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 770 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 771 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 772 HBaseSemanticAttributes.Operation.PUT); 773 return TraceUtil.trace( 774 () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(), 775 supplier); 776 } 777 778 @Override 779 @Deprecated 780 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 781 final byte[] value, final Delete delete) throws IOException { 782 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 783 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 784 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 785 HBaseSemanticAttributes.Operation.DELETE); 786 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, 787 value, null, null, delete).isSuccess(), supplier); 788 } 789 790 @Override 791 @Deprecated 792 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 793 final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { 794 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 795 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 796 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 797 HBaseSemanticAttributes.Operation.DELETE); 798 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, 799 toCompareOperator(compareOp), value, null, null, delete).isSuccess(), supplier); 800 } 801 802 @Override 803 @Deprecated 804 public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, 805 final CompareOperator op, final byte[] value, final Delete delete) throws IOException { 806 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 807 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 808 .setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, 809 HBaseSemanticAttributes.Operation.DELETE); 810 return TraceUtil.trace( 811 () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(), 812 supplier); 813 } 814 815 @Override 816 @Deprecated 817 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 818 return new CheckAndMutateBuilderImpl(row, family); 819 } 820 821 @Override 822 @Deprecated 823 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 824 return new CheckAndMutateWithFilterBuilderImpl(row, filter); 825 } 826 827 private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, 828 final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, 829 final TimeRange timeRange, final RowMutations rm) throws IOException { 830 long nonceGroup = getNonceGroup(); 831 long nonce = getNonce(); 832 CancellableRegionServerCallable<MultiResponse> callable = 833 new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), 834 rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), 835 rm.getMaxPriority(), requestAttributes) { 836 @Override 837 protected MultiResponse rpcCall() throws Exception { 838 MultiRequest request = 839 RequestConverter.buildMultiRequest(getLocation().getRegionInfo().getRegionName(), row, 840 family, qualifier, op, value, filter, timeRange, rm, nonceGroup, nonce); 841 ClientProtos.MultiResponse response = doMulti(request); 842 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); 843 if (res.hasException()) { 844 Throwable ex = ProtobufUtil.toException(res.getException()); 845 if (ex instanceof IOException) { 846 throw (IOException) ex; 847 } 848 throw new IOException( 849 "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); 850 } 851 return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); 852 } 853 }; 854 855 /** 856 * Currently, we use one array to store 'processed' flag which is returned by server. It is 857 * excessive to send such a large array, but that is required by the framework right now 858 */ 859 Object[] results = new Object[rm.getMutations().size()]; 860 AsyncProcessTask task = AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName) 861 .setRowAccess(rm.getMutations()).setResults(results).setCallable(callable) 862 // TODO any better timeout? 863 .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) 864 .setOperationTimeout(operationTimeoutMs).setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 865 .setRequestAttributes(requestAttributes).build(); 866 AsyncRequestFuture ars = multiAp.submit(task); 867 ars.waitUntilDone(); 868 if (ars.hasError()) { 869 throw ars.getErrors(); 870 } 871 872 return (CheckAndMutateResult) results[0]; 873 } 874 875 @Override 876 @Deprecated 877 public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, 878 final CompareOp compareOp, final byte[] value, final RowMutations rm) throws IOException { 879 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 880 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 881 .setContainerOperations(rm); 882 return TraceUtil.trace(() -> doCheckAndMutate(row, family, qualifier, 883 toCompareOperator(compareOp), value, null, null, rm).isSuccess(), supplier); 884 } 885 886 @Override 887 @Deprecated 888 public boolean checkAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, 889 final CompareOperator op, final byte[] value, final RowMutations rm) throws IOException { 890 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 891 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) 892 .setContainerOperations(rm); 893 return TraceUtil.trace( 894 () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(), 895 supplier); 896 } 897 898 @Override 899 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { 900 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 901 .setTableName(tableName).setOperation(checkAndMutate).setContainerOperations(checkAndMutate); 902 return TraceUtil.trace(() -> { 903 Row action = checkAndMutate.getAction(); 904 if ( 905 action instanceof Put || action instanceof Delete || action instanceof Increment 906 || action instanceof Append 907 ) { 908 if (action instanceof Put) { 909 validatePut((Put) action); 910 } 911 return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), 912 checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 913 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action); 914 } else { 915 return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(), 916 checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), 917 checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action); 918 } 919 }, supplier); 920 } 921 922 private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, 923 final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter, 924 final TimeRange timeRange, final Mutation mutation) throws IOException { 925 long nonceGroup = getNonceGroup(); 926 long nonce = getNonce(); 927 ClientServiceCallable<CheckAndMutateResult> callable = 928 new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row, 929 this.rpcControllerFactory.newController(), mutation.getPriority(), requestAttributes) { 930 @Override 931 protected CheckAndMutateResult rpcCall() throws Exception { 932 MutateRequest request = 933 RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, 934 family, qualifier, op, value, filter, timeRange, mutation, nonceGroup, nonce); 935 MutateResponse response = doMutate(request); 936 if (response.hasResult()) { 937 return new CheckAndMutateResult(response.getProcessed(), 938 ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner())); 939 } 940 return new CheckAndMutateResult(response.getProcessed(), null); 941 } 942 }; 943 return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs) 944 .callWithRetries(callable, this.operationTimeoutMs); 945 } 946 947 @Override 948 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates) 949 throws IOException { 950 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 951 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.BATCH) 952 .setContainerOperations(checkAndMutates); 953 return TraceUtil.trace(() -> { 954 if (checkAndMutates.isEmpty()) { 955 return Collections.emptyList(); 956 } 957 if (checkAndMutates.size() == 1) { 958 return Collections.singletonList(checkAndMutate(checkAndMutates.get(0))); 959 } 960 961 Object[] results = new Object[checkAndMutates.size()]; 962 try { 963 batch(checkAndMutates, results, writeRpcTimeoutMs); 964 } catch (InterruptedException e) { 965 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 966 } 967 968 // translate. 969 List<CheckAndMutateResult> ret = new ArrayList<>(results.length); 970 for (Object r : results) { 971 // Batch ensures if there is a failure we get an exception instead 972 ret.add((CheckAndMutateResult) r); 973 } 974 return ret; 975 }, supplier); 976 } 977 978 private CompareOperator toCompareOperator(CompareOp compareOp) { 979 switch (compareOp) { 980 case LESS: 981 return CompareOperator.LESS; 982 983 case LESS_OR_EQUAL: 984 return CompareOperator.LESS_OR_EQUAL; 985 986 case EQUAL: 987 return CompareOperator.EQUAL; 988 989 case NOT_EQUAL: 990 return CompareOperator.NOT_EQUAL; 991 992 case GREATER_OR_EQUAL: 993 return CompareOperator.GREATER_OR_EQUAL; 994 995 case GREATER: 996 return CompareOperator.GREATER; 997 998 case NO_OP: 999 return CompareOperator.NO_OP; 1000 1001 default: 1002 throw new AssertionError(); 1003 } 1004 } 1005 1006 @Override 1007 public boolean exists(final Get get) throws IOException { 1008 final Supplier<Span> supplier = 1009 new TableOperationSpanBuilder(connection).setTableName(tableName).setOperation(get); 1010 return TraceUtil.trace(() -> { 1011 Result r = get(get, true); 1012 assert r.getExists() != null; 1013 return r.getExists(); 1014 }, supplier); 1015 } 1016 1017 @Override 1018 public boolean[] exists(List<Get> gets) throws IOException { 1019 final Supplier<Span> supplier = 1020 new TableOperationSpanBuilder(connection).setTableName(tableName) 1021 .setOperation(HBaseSemanticAttributes.Operation.BATCH).setContainerOperations(gets); 1022 return TraceUtil.trace(() -> { 1023 if (gets.isEmpty()) { 1024 return new boolean[] {}; 1025 } 1026 if (gets.size() == 1) { 1027 return new boolean[] { exists(gets.get(0)) }; 1028 } 1029 1030 ArrayList<Get> exists = new ArrayList<>(gets.size()); 1031 for (Get g : gets) { 1032 Get ge = new Get(g); 1033 ge.setCheckExistenceOnly(true); 1034 exists.add(ge); 1035 } 1036 1037 Object[] r1 = new Object[exists.size()]; 1038 try { 1039 batch(exists, r1, readRpcTimeoutMs); 1040 } catch (InterruptedException e) { 1041 throw (InterruptedIOException) new InterruptedIOException().initCause(e); 1042 } 1043 1044 // translate. 1045 boolean[] results = new boolean[r1.length]; 1046 int i = 0; 1047 for (Object o : r1) { 1048 // batch ensures if there is a failure we get an exception instead 1049 results[i++] = ((Result) o).getExists(); 1050 } 1051 1052 return results; 1053 }, supplier); 1054 } 1055 1056 /** 1057 * Process a mixed batch of Get, Put and Delete actions. All actions for a RegionServer are 1058 * forwarded in one RPC call. Queries are executed in parallel. 1059 * @param list The collection of actions. 1060 * @param results An empty array, same size as list. If an exception is thrown, you can test here 1061 * for partial results, and to determine which actions processed successfully. 1062 * @throws IOException if there are problems talking to META. Per-item exceptions are stored in 1063 * the results array. 1064 */ 1065 public <R> void processBatchCallback(final List<? extends Row> list, final Object[] results, 1066 final Batch.Callback<R> callback) throws IOException, InterruptedException { 1067 this.batchCallback(list, results, callback); 1068 } 1069 1070 @Override 1071 public void close() throws IOException { 1072 final Supplier<Span> supplier = 1073 new TableSpanBuilder(connection).setName("HTable.close").setTableName(tableName); 1074 TraceUtil.trace(() -> { 1075 if (this.closed) { 1076 return; 1077 } 1078 if (cleanupPoolOnClose) { 1079 this.pool.shutdown(); 1080 try { 1081 boolean terminated = false; 1082 do { 1083 // wait until the pool has terminated 1084 terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); 1085 } while (!terminated); 1086 } catch (InterruptedException e) { 1087 this.pool.shutdownNow(); 1088 LOG.warn("waitForTermination interrupted"); 1089 } 1090 } 1091 this.closed = true; 1092 }, supplier); 1093 } 1094 1095 // validate for well-formedness 1096 private void validatePut(final Put put) throws IllegalArgumentException { 1097 ConnectionUtils.validatePut(put, connConfiguration.getMaxKeyValueSize()); 1098 } 1099 1100 /** 1101 * The pool is used for mutli requests for this HTable 1102 * @return the pool used for mutli 1103 */ 1104 ExecutorService getPool() { 1105 return this.pool; 1106 } 1107 1108 /** 1109 * Explicitly clears the region cache to fetch the latest value from META. This is a power user 1110 * function: avoid unless you know the ramifications. 1111 */ 1112 public void clearRegionCache() { 1113 this.connection.clearRegionLocationCache(); 1114 } 1115 1116 @Override 1117 public CoprocessorRpcChannel coprocessorService(byte[] row) { 1118 return new RegionCoprocessorRpcChannel(connection, tableName, row, requestAttributes); 1119 } 1120 1121 @Override 1122 public <T extends Service, R> Map<byte[], R> coprocessorService(final Class<T> service, 1123 byte[] startKey, byte[] endKey, final Batch.Call<T, R> callable) 1124 throws ServiceException, Throwable { 1125 final Map<byte[], R> results = 1126 Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR)); 1127 coprocessorService(service, startKey, endKey, callable, (region, row, value) -> { 1128 if (region != null) { 1129 results.put(region, value); 1130 } 1131 }); 1132 return results; 1133 } 1134 1135 @Override 1136 public <T extends Service, R> void coprocessorService(final Class<T> service, byte[] startKey, 1137 byte[] endKey, final Batch.Call<T, R> callable, final Batch.Callback<R> callback) 1138 throws ServiceException, Throwable { 1139 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1140 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); 1141 TraceUtil.trace(() -> { 1142 final Context context = Context.current(); 1143 final ExecutorService wrappedPool = context.wrap(pool); 1144 // get regions covered by the row range 1145 List<byte[]> keys = getStartKeysInRange(startKey, endKey); 1146 Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR); 1147 for (final byte[] r : keys) { 1148 final RegionCoprocessorRpcChannel channel = 1149 new RegionCoprocessorRpcChannel(connection, tableName, r, requestAttributes); 1150 Future<R> future = wrappedPool.submit(() -> { 1151 T instance = 1152 org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel); 1153 R result = callable.call(instance); 1154 byte[] region = channel.getLastRegion(); 1155 if (callback != null) { 1156 callback.update(region, r, result); 1157 } 1158 return result; 1159 }); 1160 futures.put(r, future); 1161 } 1162 for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) { 1163 try { 1164 e.getValue().get(); 1165 } catch (ExecutionException ee) { 1166 LOG.warn("Error calling coprocessor service {} for row {}", service.getName(), 1167 Bytes.toStringBinary(e.getKey()), ee); 1168 throw ee.getCause(); 1169 } catch (InterruptedException ie) { 1170 throw new InterruptedIOException("Interrupted calling coprocessor service " 1171 + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); 1172 } 1173 } 1174 }, supplier); 1175 } 1176 1177 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException { 1178 if (start == null) { 1179 start = HConstants.EMPTY_START_ROW; 1180 } 1181 if (end == null) { 1182 end = HConstants.EMPTY_END_ROW; 1183 } 1184 return getKeysAndRegionsInRange(start, end, true).getFirst(); 1185 } 1186 1187 @Override 1188 public long getRpcTimeout(TimeUnit unit) { 1189 return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS); 1190 } 1191 1192 @Override 1193 @Deprecated 1194 public int getRpcTimeout() { 1195 return rpcTimeoutMs; 1196 } 1197 1198 @Override 1199 @Deprecated 1200 public void setRpcTimeout(int rpcTimeout) { 1201 setReadRpcTimeout(rpcTimeout); 1202 setWriteRpcTimeout(rpcTimeout); 1203 } 1204 1205 @Override 1206 public long getReadRpcTimeout(TimeUnit unit) { 1207 return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS); 1208 } 1209 1210 @Override 1211 @Deprecated 1212 public int getReadRpcTimeout() { 1213 return readRpcTimeoutMs; 1214 } 1215 1216 @Override 1217 @Deprecated 1218 public void setReadRpcTimeout(int readRpcTimeout) { 1219 this.readRpcTimeoutMs = readRpcTimeout; 1220 } 1221 1222 @Override 1223 public long getWriteRpcTimeout(TimeUnit unit) { 1224 return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS); 1225 } 1226 1227 @Override 1228 @Deprecated 1229 public int getWriteRpcTimeout() { 1230 return writeRpcTimeoutMs; 1231 } 1232 1233 @Override 1234 @Deprecated 1235 public void setWriteRpcTimeout(int writeRpcTimeout) { 1236 this.writeRpcTimeoutMs = writeRpcTimeout; 1237 } 1238 1239 @Override 1240 public long getOperationTimeout(TimeUnit unit) { 1241 return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS); 1242 } 1243 1244 @Override 1245 @Deprecated 1246 public int getOperationTimeout() { 1247 return operationTimeoutMs; 1248 } 1249 1250 @Override 1251 @Deprecated 1252 public void setOperationTimeout(int operationTimeout) { 1253 this.operationTimeoutMs = operationTimeout; 1254 } 1255 1256 @Override 1257 public String toString() { 1258 return tableName + ";" + connection; 1259 } 1260 1261 @Override 1262 public <R extends Message> Map<byte[], R> batchCoprocessorService( 1263 Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, 1264 R responsePrototype) throws ServiceException, Throwable { 1265 final Map<byte[], R> results = 1266 Collections.synchronizedMap(new TreeMap<>(Bytes.BYTES_COMPARATOR)); 1267 batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, 1268 (region, row, result) -> { 1269 if (region != null) { 1270 results.put(region, result); 1271 } 1272 }); 1273 return results; 1274 } 1275 1276 @Override 1277 public <R extends Message> void batchCoprocessorService( 1278 final Descriptors.MethodDescriptor methodDescriptor, final Message request, byte[] startKey, 1279 byte[] endKey, final R responsePrototype, final Batch.Callback<R> callback) 1280 throws ServiceException, Throwable { 1281 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1282 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC); 1283 TraceUtil.trace(() -> { 1284 final Context context = Context.current(); 1285 final byte[] sanitizedStartKey = 1286 Optional.ofNullable(startKey).orElse(HConstants.EMPTY_START_ROW); 1287 final byte[] sanitizedEndKey = Optional.ofNullable(endKey).orElse(HConstants.EMPTY_END_ROW); 1288 1289 // get regions covered by the row range 1290 Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions = 1291 getKeysAndRegionsInRange(sanitizedStartKey, sanitizedEndKey, true); 1292 List<byte[]> keys = keysAndRegions.getFirst(); 1293 List<HRegionLocation> regions = keysAndRegions.getSecond(); 1294 1295 // check if we have any calls to make 1296 if (keys.isEmpty()) { 1297 LOG.info("No regions were selected by key range start={}, end={}", 1298 Bytes.toStringBinary(sanitizedStartKey), Bytes.toStringBinary(sanitizedEndKey)); 1299 return; 1300 } 1301 1302 List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size()); 1303 final Map<byte[], RegionCoprocessorServiceExec> execsByRow = 1304 new TreeMap<>(Bytes.BYTES_COMPARATOR); 1305 for (int i = 0; i < keys.size(); i++) { 1306 final byte[] rowKey = keys.get(i); 1307 final byte[] region = regions.get(i).getRegionInfo().getRegionName(); 1308 RegionCoprocessorServiceExec exec = 1309 new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request); 1310 execs.add(exec); 1311 execsByRow.put(rowKey, exec); 1312 } 1313 1314 // tracking for any possible deserialization errors on success callback 1315 // TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here 1316 final List<Throwable> callbackErrorExceptions = new ArrayList<>(); 1317 final List<Row> callbackErrorActions = new ArrayList<>(); 1318 final List<String> callbackErrorServers = new ArrayList<>(); 1319 Object[] results = new Object[execs.size()]; 1320 1321 AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, 1322 RpcRetryingCallerFactory.instantiate(configuration, connConfiguration, 1323 connection.getStatisticsTracker(), connection.getConnectionMetrics()), 1324 RpcControllerFactory.instantiate(configuration)); 1325 1326 Batch.Callback<ClientProtos.CoprocessorServiceResult> resultsCallback = 1327 (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> { 1328 if (LOG.isTraceEnabled()) { 1329 LOG.trace("Received result for endpoint {}: region={}, row={}, value={}", 1330 methodDescriptor.getFullName(), Bytes.toStringBinary(region), 1331 Bytes.toStringBinary(row), serviceResult.getValue().getValue()); 1332 } 1333 try { 1334 Message.Builder builder = responsePrototype.newBuilderForType(); 1335 org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder, 1336 serviceResult.getValue().getValue().toByteArray()); 1337 callback.update(region, row, (R) builder.build()); 1338 } catch (IOException e) { 1339 LOG.error("Unexpected response type from endpoint {}", methodDescriptor.getFullName(), 1340 e); 1341 callbackErrorExceptions.add(e); 1342 callbackErrorActions.add(execsByRow.get(row)); 1343 callbackErrorServers.add("null"); 1344 } 1345 }; 1346 AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = 1347 AsyncProcessTask.newBuilder(resultsCallback).setPool(context.wrap(pool)) 1348 .setTableName(tableName).setRowAccess(execs).setResults(results) 1349 .setRpcTimeout(readRpcTimeoutMs).setOperationTimeout(operationTimeoutMs) 1350 .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) 1351 .setRequestAttributes(requestAttributes).build(); 1352 AsyncRequestFuture future = asyncProcess.submit(task); 1353 future.waitUntilDone(); 1354 1355 if (future.hasError()) { 1356 throw future.getErrors(); 1357 } else if (!callbackErrorExceptions.isEmpty()) { 1358 throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, 1359 callbackErrorActions, callbackErrorServers); 1360 } 1361 }, supplier); 1362 } 1363 1364 @Override 1365 public RegionLocator getRegionLocator() { 1366 return this.locator; 1367 } 1368 1369 private class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { 1370 1371 private final byte[] row; 1372 private final byte[] family; 1373 private byte[] qualifier; 1374 private TimeRange timeRange; 1375 private CompareOperator op; 1376 private byte[] value; 1377 1378 CheckAndMutateBuilderImpl(byte[] row, byte[] family) { 1379 this.row = Preconditions.checkNotNull(row, "row is null"); 1380 this.family = Preconditions.checkNotNull(family, "family is null"); 1381 } 1382 1383 @Override 1384 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 1385 this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" 1386 + " an empty byte array, or just do not call this method if you want a null qualifier"); 1387 return this; 1388 } 1389 1390 @Override 1391 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 1392 this.timeRange = timeRange; 1393 return this; 1394 } 1395 1396 @Override 1397 public CheckAndMutateBuilder ifNotExists() { 1398 this.op = CompareOperator.EQUAL; 1399 this.value = null; 1400 return this; 1401 } 1402 1403 @Override 1404 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 1405 this.op = Preconditions.checkNotNull(compareOp, "compareOp is null"); 1406 this.value = Preconditions.checkNotNull(value, "value is null"); 1407 return this; 1408 } 1409 1410 private void preCheck() { 1411 Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" 1412 + " calling ifNotExists/ifEquals/ifMatches before executing the request"); 1413 } 1414 1415 @Override 1416 public boolean thenPut(Put put) throws IOException { 1417 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1418 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1419 return TraceUtil.trace(() -> { 1420 validatePut(put); 1421 preCheck(); 1422 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put) 1423 .isSuccess(); 1424 }, supplier); 1425 } 1426 1427 @Override 1428 public boolean thenDelete(Delete delete) throws IOException { 1429 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1430 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1431 return TraceUtil.trace(() -> { 1432 preCheck(); 1433 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete) 1434 .isSuccess(); 1435 }, supplier); 1436 } 1437 1438 @Override 1439 public boolean thenMutate(RowMutations mutation) throws IOException { 1440 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1441 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1442 return TraceUtil.trace(() -> { 1443 preCheck(); 1444 return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation) 1445 .isSuccess(); 1446 }, supplier); 1447 } 1448 } 1449 1450 @Override 1451 public Map<String, byte[]> getRequestAttributes() { 1452 return requestAttributes; 1453 } 1454 1455 private class CheckAndMutateWithFilterBuilderImpl implements CheckAndMutateWithFilterBuilder { 1456 1457 private final byte[] row; 1458 private final Filter filter; 1459 private TimeRange timeRange; 1460 1461 CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) { 1462 this.row = Preconditions.checkNotNull(row, "row is null"); 1463 this.filter = Preconditions.checkNotNull(filter, "filter is null"); 1464 } 1465 1466 @Override 1467 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 1468 this.timeRange = timeRange; 1469 return this; 1470 } 1471 1472 @Override 1473 public boolean thenPut(Put put) throws IOException { 1474 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1475 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1476 return TraceUtil.trace(() -> { 1477 validatePut(put); 1478 return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess(); 1479 }, supplier); 1480 } 1481 1482 @Override 1483 public boolean thenDelete(Delete delete) throws IOException { 1484 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1485 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1486 return TraceUtil.trace( 1487 () -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(), 1488 supplier); 1489 } 1490 1491 @Override 1492 public boolean thenMutate(RowMutations mutation) throws IOException { 1493 final Supplier<Span> supplier = new TableOperationSpanBuilder(connection) 1494 .setTableName(tableName).setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); 1495 return TraceUtil 1496 .trace(() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation) 1497 .isSuccess(), supplier); 1498 } 1499 } 1500}