001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import io.opentelemetry.api.trace.Span;
032import java.io.IOException;
033import java.util.Collections;
034import java.util.Map;
035import java.util.Optional;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.ConcurrentMap;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicReference;
043import org.apache.commons.io.IOUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.AuthUtil;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.MasterNotRunningException;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
051import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
052import org.apache.hadoop.hbase.ipc.RpcClient;
053import org.apache.hadoop.hbase.ipc.RpcClientFactory;
054import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.trace.TraceUtil;
057import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
066
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
071
072/**
073 * The implementation of AsyncConnection.
074 */
075@InterfaceAudience.Private
076public class AsyncConnectionImpl implements AsyncConnection {
077
078  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
079
080  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
081    new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
082      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
083    10, TimeUnit.MILLISECONDS);
084
085  private final Configuration conf;
086
087  final AsyncConnectionConfiguration connConf;
088
089  private final User user;
090
091  final ConnectionRegistry registry;
092
093  private final int rpcTimeout;
094
095  private final RpcClient rpcClient;
096
097  final RpcControllerFactory rpcControllerFactory;
098
099  private final AsyncRegionLocator locator;
100
101  final AsyncRpcRetryingCallerFactory callerFactory;
102
103  private final NonceGenerator nonceGenerator;
104
105  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
106  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
107
108  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
109
110  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
111    new AtomicReference<>();
112
113  private final Optional<ServerStatisticTracker> stats;
114  private final ClientBackoffPolicy backoffPolicy;
115
116  private ChoreService choreService;
117
118  private volatile boolean closed = false;
119
120  private final String metricsScope;
121  private final Optional<MetricsConnection> metrics;
122
123  private final ClusterStatusListener clusterStatusListener;
124
125  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
126    User user) {
127    this(conf, registry, clusterId, user, Collections.emptyMap());
128  }
129
130  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
131    User user, Map<String, byte[]> connectionAttributes) {
132    this.conf = conf;
133    this.user = user;
134    this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
135
136    if (user.isLoginFromKeytab()) {
137      spawnRenewalChore(user.getUGI());
138    }
139    this.connConf = new AsyncConnectionConfiguration(conf);
140    this.registry = registry;
141    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
142      this.metrics = Optional
143        .of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null));
144    } else {
145      this.metrics = Optional.empty();
146    }
147    this.rpcClient =
148      RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null), connectionAttributes);
149    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
150    this.rpcTimeout =
151      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
152    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
153    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
154    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
155      nonceGenerator = PerClientRandomNonceGenerator.get();
156    } else {
157      nonceGenerator = NO_NONCE_GENERATOR;
158    }
159    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
160    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
161    ClusterStatusListener listener = null;
162    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
163      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
164      // it in, just like clusterId. Not a big problem for now as the default value is false.
165      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
166        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
167      if (listenerClass == null) {
168        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
169      } else {
170        try {
171          listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
172            @Override
173            public void newDead(ServerName sn) {
174              locator.clearCache(sn);
175              rpcClient.cancelConnections(sn);
176            }
177          }, conf, listenerClass);
178        } catch (IOException e) {
179          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
180        }
181      }
182    }
183    this.clusterStatusListener = listener;
184  }
185
186  private void spawnRenewalChore(final UserGroupInformation user) {
187    ChoreService service = getChoreService();
188    service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf));
189  }
190
191  /**
192   * If choreService has not been created yet, create the ChoreService.
193   */
194  synchronized ChoreService getChoreService() {
195    if (isClosed()) {
196      throw new IllegalStateException("connection is already closed");
197    }
198    if (choreService == null) {
199      choreService = new ChoreService("AsyncConn Chore Service");
200    }
201    return choreService;
202  }
203
204  public User getUser() {
205    return user;
206  }
207
208  public ConnectionRegistry getConnectionRegistry() {
209    return registry;
210  }
211
212  @Override
213  public Configuration getConfiguration() {
214    return conf;
215  }
216
217  @Override
218  public void close() {
219    TraceUtil.trace(() -> {
220      // As the code below is safe to be executed in parallel, here we do not use CAS or lock,
221      // just a simple volatile flag.
222      if (closed) {
223        return;
224      }
225      LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
226      if (LOG.isDebugEnabled()) {
227        logCallStack(Thread.currentThread().getStackTrace());
228      }
229      IOUtils.closeQuietly(clusterStatusListener,
230        e -> LOG.warn("failed to close clusterStatusListener", e));
231      IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
232      IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
233      synchronized (this) {
234        if (choreService != null) {
235          choreService.shutdown();
236          choreService = null;
237        }
238      }
239      if (metrics.isPresent()) {
240        MetricsConnection.deleteMetricsConnection(metricsScope);
241      }
242      closed = true;
243    }, "AsyncConnection.close");
244  }
245
246  private void logCallStack(StackTraceElement[] stackTraceElements) {
247    StringBuilder stackBuilder = new StringBuilder("Call stack:");
248    for (StackTraceElement element : stackTraceElements) {
249      stackBuilder.append("\n    at ");
250      stackBuilder.append(element);
251    }
252    stackBuilder.append("\n");
253    LOG.debug(stackBuilder.toString());
254  }
255
256  @Override
257  public boolean isClosed() {
258    return closed;
259  }
260
261  @Override
262  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
263    return new AsyncTableRegionLocatorImpl(tableName, this);
264  }
265
266  // we will override this method for testing retry caller, so do not remove this method.
267  AsyncRegionLocator getLocator() {
268    return locator;
269  }
270
271  // ditto
272  public NonceGenerator getNonceGenerator() {
273    return nonceGenerator;
274  }
275
276  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
277    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
278  }
279
280  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
281    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
282      getStubKey(ClientService.getDescriptor().getName(), serverName),
283      () -> createRegionServerStub(serverName));
284  }
285
286  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
287    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
288  }
289
290  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
291    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
292  }
293
294  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
295    return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
296      getStubKey(AdminService.getDescriptor().getName(), serverName),
297      () -> createAdminServerStub(serverName));
298  }
299
300  CompletableFuture<MasterService.Interface> getMasterStub() {
301    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
302      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
303      addListener(registry.getActiveMaster(), (addr, error) -> {
304        if (error != null) {
305          future.completeExceptionally(error);
306        } else if (addr == null) {
307          future.completeExceptionally(new MasterNotRunningException(
308            "ZooKeeper available but no active master location found"));
309        } else {
310          LOG.debug("The fetched master address is {}", addr);
311          try {
312            future.complete(createMasterStub(addr));
313          } catch (IOException e) {
314            future.completeExceptionally(e);
315          }
316        }
317
318      });
319      return future;
320    }, stub -> true, "master stub");
321  }
322
323  String getClusterId() {
324    try {
325      return registry.getClusterId().get();
326    } catch (InterruptedException | ExecutionException e) {
327      LOG.error("Error fetching cluster ID: ", e);
328    }
329    return null;
330  }
331
332  void clearMasterStubCache(MasterService.Interface stub) {
333    masterStub.compareAndSet(stub, null);
334  }
335
336  Optional<ServerStatisticTracker> getStatisticsTracker() {
337    return stats;
338  }
339
340  ClientBackoffPolicy getBackoffPolicy() {
341    return backoffPolicy;
342  }
343
344  @Override
345  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
346    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
347
348      @Override
349      public AsyncTable<AdvancedScanResultConsumer> build() {
350        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
351      }
352    };
353  }
354
355  @Override
356  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
357    ExecutorService pool) {
358    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
359
360      @Override
361      public AsyncTable<ScanResultConsumer> build() {
362        RawAsyncTableImpl rawTable =
363          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
364        return new AsyncTableImpl(rawTable, pool);
365      }
366    };
367  }
368
369  @Override
370  public AsyncAdminBuilder getAdminBuilder() {
371    return new AsyncAdminBuilderBase(connConf) {
372      @Override
373      public AsyncAdmin build() {
374        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
375      }
376    };
377  }
378
379  @Override
380  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
381    return new AsyncAdminBuilderBase(connConf) {
382      @Override
383      public AsyncAdmin build() {
384        RawAsyncHBaseAdmin rawAdmin =
385          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
386        return new AsyncHBaseAdmin(rawAdmin, pool);
387      }
388    };
389  }
390
391  @Override
392  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
393    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
394  }
395
396  @Override
397  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
398    ExecutorService pool) {
399    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
400      RETRY_TIMER);
401  }
402
403  private Hbck getHbckInternal(ServerName masterServer) {
404    Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName());
405    // we will not create a new connection when creating a new protobuf stub, and for hbck there
406    // will be no performance consideration, so for simplification we will create a new stub every
407    // time instead of caching the stub here.
408    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
409      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
410  }
411
412  @Override
413  public CompletableFuture<Hbck> getHbck() {
414    return TraceUtil.tracedFuture(() -> {
415      CompletableFuture<Hbck> future = new CompletableFuture<>();
416      addListener(registry.getActiveMaster(), (sn, error) -> {
417        if (error != null) {
418          future.completeExceptionally(error);
419        } else {
420          future.complete(getHbckInternal(sn));
421        }
422      });
423      return future;
424    }, "AsyncConnection.getHbck");
425  }
426
427  @Override
428  public Hbck getHbck(ServerName masterServer) {
429    return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck");
430  }
431
432  @Override
433  public void clearRegionLocationCache() {
434    locator.clearCache();
435  }
436
437  Optional<MetricsConnection> getConnectionMetrics() {
438    return metrics;
439  }
440}