001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static java.util.stream.Collectors.toList;
021
022import com.google.protobuf.RpcChannel;
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.context.Context;
025import io.opentelemetry.context.Scope;
026import java.io.IOException;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.CompletableFuture;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.TimeUnit;
032import java.util.function.Function;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CompareOperator;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.filter.Filter;
037import org.apache.hadoop.hbase.io.TimeRange;
038import org.apache.hadoop.hbase.util.FutureUtils;
039import org.apache.yetus.audience.InterfaceAudience;
040
041/**
042 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
043 * thread pool when constructing this class, and the callback methods registered to the returned
044 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
045 * to do anything they want in the callbacks without breaking the rpc framework.
046 */
047@InterfaceAudience.Private
048class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
049
050  private final RawAsyncTableImpl rawTable;
051
052  private final ExecutorService pool;
053
054  AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) {
055    this.rawTable = rawTable;
056    this.pool = pool;
057  }
058
059  @Override
060  public TableName getName() {
061    return rawTable.getName();
062  }
063
064  @Override
065  public Configuration getConfiguration() {
066    return rawTable.getConfiguration();
067  }
068
069  @Override
070  public CompletableFuture<TableDescriptor> getDescriptor() {
071    return wrap(rawTable.getDescriptor());
072  }
073
074  @Override
075  public AsyncTableRegionLocator getRegionLocator() {
076    return rawTable.getRegionLocator();
077  }
078
079  @Override
080  public long getRpcTimeout(TimeUnit unit) {
081    return rawTable.getRpcTimeout(unit);
082  }
083
084  @Override
085  public long getReadRpcTimeout(TimeUnit unit) {
086    return rawTable.getReadRpcTimeout(unit);
087  }
088
089  @Override
090  public long getWriteRpcTimeout(TimeUnit unit) {
091    return rawTable.getWriteRpcTimeout(unit);
092  }
093
094  @Override
095  public long getOperationTimeout(TimeUnit unit) {
096    return rawTable.getOperationTimeout(unit);
097  }
098
099  @Override
100  public long getScanTimeout(TimeUnit unit) {
101    return rawTable.getScanTimeout(unit);
102  }
103
104  @Override
105  public Map<String, byte[]> getRequestAttributes() {
106    return rawTable.getRequestAttributes();
107  }
108
109  private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
110    return FutureUtils.wrapFuture(future, pool);
111  }
112
113  @Override
114  public CompletableFuture<Result> get(Get get) {
115    return wrap(rawTable.get(get));
116  }
117
118  @Override
119  public CompletableFuture<Void> put(Put put) {
120    return wrap(rawTable.put(put));
121  }
122
123  @Override
124  public CompletableFuture<Void> delete(Delete delete) {
125    return wrap(rawTable.delete(delete));
126  }
127
128  @Override
129  public CompletableFuture<Result> append(Append append) {
130    return wrap(rawTable.append(append));
131  }
132
133  @Override
134  public CompletableFuture<Result> increment(Increment increment) {
135    return wrap(rawTable.increment(increment));
136  }
137
138  @Override
139  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
140    return new CheckAndMutateBuilder() {
141
142      private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
143
144      @Override
145      public CompletableFuture<Boolean> thenPut(Put put) {
146        return wrap(builder.thenPut(put));
147      }
148
149      @Override
150      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
151        return wrap(builder.thenMutate(mutation));
152      }
153
154      @Override
155      public CompletableFuture<Boolean> thenDelete(Delete delete) {
156        return wrap(builder.thenDelete(delete));
157      }
158
159      @Override
160      public CheckAndMutateBuilder qualifier(byte[] qualifier) {
161        builder.qualifier(qualifier);
162        return this;
163      }
164
165      @Override
166      public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
167        builder.timeRange(timeRange);
168        return this;
169      }
170
171      @Override
172      public CheckAndMutateBuilder ifNotExists() {
173        builder.ifNotExists();
174        return this;
175      }
176
177      @Override
178      public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
179        builder.ifMatches(compareOp, value);
180        return this;
181      }
182    };
183  }
184
185  @Override
186  public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
187    return new CheckAndMutateWithFilterBuilder() {
188
189      private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);
190
191      @Override
192      public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
193        builder.timeRange(timeRange);
194        return this;
195      }
196
197      @Override
198      public CompletableFuture<Boolean> thenPut(Put put) {
199        return wrap(builder.thenPut(put));
200      }
201
202      @Override
203      public CompletableFuture<Boolean> thenDelete(Delete delete) {
204        return wrap(builder.thenDelete(delete));
205      }
206
207      @Override
208      public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
209        return wrap(builder.thenMutate(mutation));
210      }
211    };
212  }
213
214  @Override
215  public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
216    return wrap(rawTable.checkAndMutate(checkAndMutate));
217  }
218
219  @Override
220  public List<CompletableFuture<CheckAndMutateResult>>
221    checkAndMutate(List<CheckAndMutate> checkAndMutates) {
222    return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
223  }
224
225  @Override
226  public CompletableFuture<Result> mutateRow(RowMutations mutation) {
227    return wrap(rawTable.mutateRow(mutation));
228  }
229
230  @Override
231  public CompletableFuture<List<Result>> scanAll(Scan scan) {
232    return wrap(rawTable.scanAll(scan));
233  }
234
235  @Override
236  public ResultScanner getScanner(Scan scan) {
237    return rawTable.getScanner(scan);
238  }
239
240  private void scan0(Scan scan, ScanResultConsumer consumer) {
241    Span span = null;
242    try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
243      span = scanner.getSpan();
244      try (Scope ignored = span.makeCurrent()) {
245        if (scan.isScanMetricsEnabled()) {
246          consumer.onScanMetricsCreated(scanner.getScanMetrics());
247        }
248        for (Result result; (result = scanner.next()) != null;) {
249          if (!consumer.onNext(result)) {
250            break;
251          }
252        }
253        consumer.onComplete();
254      }
255    } catch (IOException e) {
256      try (Scope ignored = span.makeCurrent()) {
257        consumer.onError(e);
258      }
259    }
260  }
261
262  @Override
263  public void scan(Scan scan, ScanResultConsumer consumer) {
264    final Context context = Context.current();
265    pool.execute(context.wrap(() -> scan0(scan, consumer)));
266  }
267
268  @Override
269  public List<CompletableFuture<Result>> get(List<Get> gets) {
270    return rawTable.get(gets).stream().map(this::wrap).collect(toList());
271  }
272
273  @Override
274  public List<CompletableFuture<Void>> put(List<Put> puts) {
275    return rawTable.put(puts).stream().map(this::wrap).collect(toList());
276  }
277
278  @Override
279  public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
280    return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
281  }
282
283  @Override
284  public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
285    return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
286  }
287
288  @Override
289  public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
290    ServiceCaller<S, R> callable, byte[] row) {
291    return wrap(rawTable.coprocessorService(stubMaker, callable, row));
292  }
293
294  @Override
295  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
296    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
297    CoprocessorCallback<R> callback) {
298    final Context context = Context.current();
299    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
300
301      @Override
302      public void onRegionComplete(RegionInfo region, R resp) {
303        pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
304      }
305
306      @Override
307      public void onRegionError(RegionInfo region, Throwable error) {
308        pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
309      }
310
311      @Override
312      public void onComplete() {
313        pool.execute(context.wrap(callback::onComplete));
314      }
315
316      @Override
317      public void onError(Throwable error) {
318        pool.execute(context.wrap(() -> callback.onError(error)));
319      }
320    };
321    CoprocessorServiceBuilder<S, R> builder =
322      rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
323    return new CoprocessorServiceBuilder<S, R>() {
324
325      @Override
326      public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
327        builder.fromRow(startKey, inclusive);
328        return this;
329      }
330
331      @Override
332      public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
333        builder.toRow(endKey, inclusive);
334        return this;
335      }
336
337      @Override
338      public void execute() {
339        builder.execute();
340      }
341    };
342  }
343}