001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021 022import com.google.protobuf.RpcChannel; 023import io.opentelemetry.api.trace.Span; 024import io.opentelemetry.context.Context; 025import io.opentelemetry.context.Scope; 026import java.io.IOException; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CompletableFuture; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.TimeUnit; 032import java.util.function.Function; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.CompareOperator; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.filter.Filter; 037import org.apache.hadoop.hbase.io.TimeRange; 038import org.apache.hadoop.hbase.util.FutureUtils; 039import org.apache.yetus.audience.InterfaceAudience; 040 041/** 042 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a 043 * thread pool when constructing this class, and the callback methods registered to the returned 044 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users 045 * to do anything they want in the callbacks without breaking the rpc framework. 046 */ 047@InterfaceAudience.Private 048class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { 049 050 private final RawAsyncTableImpl rawTable; 051 052 private final ExecutorService pool; 053 054 AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) { 055 this.rawTable = rawTable; 056 this.pool = pool; 057 } 058 059 @Override 060 public TableName getName() { 061 return rawTable.getName(); 062 } 063 064 @Override 065 public Configuration getConfiguration() { 066 return rawTable.getConfiguration(); 067 } 068 069 @Override 070 public CompletableFuture<TableDescriptor> getDescriptor() { 071 return wrap(rawTable.getDescriptor()); 072 } 073 074 @Override 075 public AsyncTableRegionLocator getRegionLocator() { 076 return rawTable.getRegionLocator(); 077 } 078 079 @Override 080 public long getRpcTimeout(TimeUnit unit) { 081 return rawTable.getRpcTimeout(unit); 082 } 083 084 @Override 085 public long getReadRpcTimeout(TimeUnit unit) { 086 return rawTable.getReadRpcTimeout(unit); 087 } 088 089 @Override 090 public long getWriteRpcTimeout(TimeUnit unit) { 091 return rawTable.getWriteRpcTimeout(unit); 092 } 093 094 @Override 095 public long getOperationTimeout(TimeUnit unit) { 096 return rawTable.getOperationTimeout(unit); 097 } 098 099 @Override 100 public long getScanTimeout(TimeUnit unit) { 101 return rawTable.getScanTimeout(unit); 102 } 103 104 @Override 105 public Map<String, byte[]> getRequestAttributes() { 106 return rawTable.getRequestAttributes(); 107 } 108 109 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) { 110 return FutureUtils.wrapFuture(future, pool); 111 } 112 113 @Override 114 public CompletableFuture<Result> get(Get get) { 115 return wrap(rawTable.get(get)); 116 } 117 118 @Override 119 public CompletableFuture<Void> put(Put put) { 120 return wrap(rawTable.put(put)); 121 } 122 123 @Override 124 public CompletableFuture<Void> delete(Delete delete) { 125 return wrap(rawTable.delete(delete)); 126 } 127 128 @Override 129 public CompletableFuture<Result> append(Append append) { 130 return wrap(rawTable.append(append)); 131 } 132 133 @Override 134 public CompletableFuture<Result> increment(Increment increment) { 135 return wrap(rawTable.increment(increment)); 136 } 137 138 @Override 139 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { 140 return new CheckAndMutateBuilder() { 141 142 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family); 143 144 @Override 145 public CompletableFuture<Boolean> thenPut(Put put) { 146 return wrap(builder.thenPut(put)); 147 } 148 149 @Override 150 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 151 return wrap(builder.thenMutate(mutation)); 152 } 153 154 @Override 155 public CompletableFuture<Boolean> thenDelete(Delete delete) { 156 return wrap(builder.thenDelete(delete)); 157 } 158 159 @Override 160 public CheckAndMutateBuilder qualifier(byte[] qualifier) { 161 builder.qualifier(qualifier); 162 return this; 163 } 164 165 @Override 166 public CheckAndMutateBuilder timeRange(TimeRange timeRange) { 167 builder.timeRange(timeRange); 168 return this; 169 } 170 171 @Override 172 public CheckAndMutateBuilder ifNotExists() { 173 builder.ifNotExists(); 174 return this; 175 } 176 177 @Override 178 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) { 179 builder.ifMatches(compareOp, value); 180 return this; 181 } 182 }; 183 } 184 185 @Override 186 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) { 187 return new CheckAndMutateWithFilterBuilder() { 188 189 private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter); 190 191 @Override 192 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { 193 builder.timeRange(timeRange); 194 return this; 195 } 196 197 @Override 198 public CompletableFuture<Boolean> thenPut(Put put) { 199 return wrap(builder.thenPut(put)); 200 } 201 202 @Override 203 public CompletableFuture<Boolean> thenDelete(Delete delete) { 204 return wrap(builder.thenDelete(delete)); 205 } 206 207 @Override 208 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) { 209 return wrap(builder.thenMutate(mutation)); 210 } 211 }; 212 } 213 214 @Override 215 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) { 216 return wrap(rawTable.checkAndMutate(checkAndMutate)); 217 } 218 219 @Override 220 public List<CompletableFuture<CheckAndMutateResult>> 221 checkAndMutate(List<CheckAndMutate> checkAndMutates) { 222 return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList()); 223 } 224 225 @Override 226 public CompletableFuture<Result> mutateRow(RowMutations mutation) { 227 return wrap(rawTable.mutateRow(mutation)); 228 } 229 230 @Override 231 public CompletableFuture<List<Result>> scanAll(Scan scan) { 232 return wrap(rawTable.scanAll(scan)); 233 } 234 235 @Override 236 public ResultScanner getScanner(Scan scan) { 237 return rawTable.getScanner(scan); 238 } 239 240 private void scan0(Scan scan, ScanResultConsumer consumer) { 241 Span span = null; 242 try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { 243 span = scanner.getSpan(); 244 try (Scope ignored = span.makeCurrent()) { 245 if (scan.isScanMetricsEnabled()) { 246 consumer.onScanMetricsCreated(scanner.getScanMetrics()); 247 } 248 for (Result result; (result = scanner.next()) != null;) { 249 if (!consumer.onNext(result)) { 250 break; 251 } 252 } 253 consumer.onComplete(); 254 } 255 } catch (IOException e) { 256 try (Scope ignored = span.makeCurrent()) { 257 consumer.onError(e); 258 } 259 } 260 } 261 262 @Override 263 public void scan(Scan scan, ScanResultConsumer consumer) { 264 final Context context = Context.current(); 265 pool.execute(context.wrap(() -> scan0(scan, consumer))); 266 } 267 268 @Override 269 public List<CompletableFuture<Result>> get(List<Get> gets) { 270 return rawTable.get(gets).stream().map(this::wrap).collect(toList()); 271 } 272 273 @Override 274 public List<CompletableFuture<Void>> put(List<Put> puts) { 275 return rawTable.put(puts).stream().map(this::wrap).collect(toList()); 276 } 277 278 @Override 279 public List<CompletableFuture<Void>> delete(List<Delete> deletes) { 280 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList()); 281 } 282 283 @Override 284 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { 285 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList()); 286 } 287 288 @Override 289 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker, 290 ServiceCaller<S, R> callable, byte[] row) { 291 return wrap(rawTable.coprocessorService(stubMaker, callable, row)); 292 } 293 294 @Override 295 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService( 296 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable, 297 CoprocessorCallback<R> callback) { 298 final Context context = Context.current(); 299 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() { 300 301 @Override 302 public void onRegionComplete(RegionInfo region, R resp) { 303 pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp))); 304 } 305 306 @Override 307 public void onRegionError(RegionInfo region, Throwable error) { 308 pool.execute(context.wrap(() -> callback.onRegionError(region, error))); 309 } 310 311 @Override 312 public void onComplete() { 313 pool.execute(context.wrap(callback::onComplete)); 314 } 315 316 @Override 317 public void onError(Throwable error) { 318 pool.execute(context.wrap(() -> callback.onError(error))); 319 } 320 }; 321 CoprocessorServiceBuilder<S, R> builder = 322 rawTable.coprocessorService(stubMaker, callable, wrappedCallback); 323 return new CoprocessorServiceBuilder<S, R>() { 324 325 @Override 326 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) { 327 builder.fromRow(startKey, inclusive); 328 return this; 329 } 330 331 @Override 332 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) { 333 builder.toRow(endKey, inclusive); 334 return this; 335 } 336 337 @Override 338 public void execute() { 339 builder.execute(); 340 } 341 }; 342 } 343}