001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED; 021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT; 022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS; 023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import io.opentelemetry.api.trace.Span; 032import java.io.IOException; 033import java.util.Collections; 034import java.util.Map; 035import java.util.Optional; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.ConcurrentMap; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ExecutorService; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicReference; 043import org.apache.commons.io.IOUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.AuthUtil; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.MasterNotRunningException; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 051import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 052import org.apache.hadoop.hbase.ipc.RpcClient; 053import org.apache.hadoop.hbase.ipc.RpcClientFactory; 054import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.trace.TraceUtil; 057import org.apache.hadoop.hbase.util.ConcurrentMapUtils; 058import org.apache.hadoop.hbase.util.Threads; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 066 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 071 072/** 073 * The implementation of AsyncConnection. 074 */ 075@InterfaceAudience.Private 076public class AsyncConnectionImpl implements AsyncConnection { 077 078 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 079 080 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 081 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) 082 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 083 10, TimeUnit.MILLISECONDS); 084 085 private final Configuration conf; 086 087 final AsyncConnectionConfiguration connConf; 088 089 private final User user; 090 091 final ConnectionRegistry registry; 092 093 private final int rpcTimeout; 094 095 private final RpcClient rpcClient; 096 097 final RpcControllerFactory rpcControllerFactory; 098 099 private final AsyncRegionLocator locator; 100 101 final AsyncRpcRetryingCallerFactory callerFactory; 102 103 private final NonceGenerator nonceGenerator; 104 105 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 106 private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>(); 107 108 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 109 110 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 111 new AtomicReference<>(); 112 113 private final Optional<ServerStatisticTracker> stats; 114 private final ClientBackoffPolicy backoffPolicy; 115 116 private ChoreService choreService; 117 118 private volatile boolean closed = false; 119 120 private final String metricsScope; 121 private final Optional<MetricsConnection> metrics; 122 123 private final ClusterStatusListener clusterStatusListener; 124 125 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 126 User user) { 127 this(conf, registry, clusterId, user, Collections.emptyMap()); 128 } 129 130 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 131 User user, Map<String, byte[]> connectionAttributes) { 132 this.conf = conf; 133 this.user = user; 134 this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); 135 136 if (user.isLoginFromKeytab()) { 137 spawnRenewalChore(user.getUGI()); 138 } 139 this.connConf = new AsyncConnectionConfiguration(conf); 140 this.registry = registry; 141 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 142 this.metrics = Optional 143 .of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null)); 144 } else { 145 this.metrics = Optional.empty(); 146 } 147 this.rpcClient = 148 RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null), connectionAttributes); 149 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 150 this.rpcTimeout = 151 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 152 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 153 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 154 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 155 nonceGenerator = PerClientRandomNonceGenerator.get(); 156 } else { 157 nonceGenerator = NO_NONCE_GENERATOR; 158 } 159 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); 160 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 161 ClusterStatusListener listener = null; 162 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) { 163 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass 164 // it in, just like clusterId. Not a big problem for now as the default value is false. 165 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass( 166 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 167 if (listenerClass == null) { 168 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); 169 } else { 170 try { 171 listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { 172 @Override 173 public void newDead(ServerName sn) { 174 locator.clearCache(sn); 175 rpcClient.cancelConnections(sn); 176 } 177 }, conf, listenerClass); 178 } catch (IOException e) { 179 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); 180 } 181 } 182 } 183 this.clusterStatusListener = listener; 184 } 185 186 private void spawnRenewalChore(final UserGroupInformation user) { 187 ChoreService service = getChoreService(); 188 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf)); 189 } 190 191 /** 192 * If choreService has not been created yet, create the ChoreService. 193 */ 194 synchronized ChoreService getChoreService() { 195 if (isClosed()) { 196 throw new IllegalStateException("connection is already closed"); 197 } 198 if (choreService == null) { 199 choreService = new ChoreService("AsyncConn Chore Service"); 200 } 201 return choreService; 202 } 203 204 public User getUser() { 205 return user; 206 } 207 208 public ConnectionRegistry getConnectionRegistry() { 209 return registry; 210 } 211 212 @Override 213 public Configuration getConfiguration() { 214 return conf; 215 } 216 217 @Override 218 public void close() { 219 TraceUtil.trace(() -> { 220 // As the code below is safe to be executed in parallel, here we do not use CAS or lock, 221 // just a simple volatile flag. 222 if (closed) { 223 return; 224 } 225 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); 226 if (LOG.isDebugEnabled()) { 227 logCallStack(Thread.currentThread().getStackTrace()); 228 } 229 IOUtils.closeQuietly(clusterStatusListener, 230 e -> LOG.warn("failed to close clusterStatusListener", e)); 231 IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e)); 232 IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e)); 233 synchronized (this) { 234 if (choreService != null) { 235 choreService.shutdown(); 236 choreService = null; 237 } 238 } 239 if (metrics.isPresent()) { 240 MetricsConnection.deleteMetricsConnection(metricsScope); 241 } 242 closed = true; 243 }, "AsyncConnection.close"); 244 } 245 246 private void logCallStack(StackTraceElement[] stackTraceElements) { 247 StringBuilder stackBuilder = new StringBuilder("Call stack:"); 248 for (StackTraceElement element : stackTraceElements) { 249 stackBuilder.append("\n at "); 250 stackBuilder.append(element); 251 } 252 stackBuilder.append("\n"); 253 LOG.debug(stackBuilder.toString()); 254 } 255 256 @Override 257 public boolean isClosed() { 258 return closed; 259 } 260 261 @Override 262 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 263 return new AsyncTableRegionLocatorImpl(tableName, this); 264 } 265 266 // we will override this method for testing retry caller, so do not remove this method. 267 AsyncRegionLocator getLocator() { 268 return locator; 269 } 270 271 // ditto 272 public NonceGenerator getNonceGenerator() { 273 return nonceGenerator; 274 } 275 276 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 277 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 278 } 279 280 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 281 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, 282 getStubKey(ClientService.getDescriptor().getName(), serverName), 283 () -> createRegionServerStub(serverName)); 284 } 285 286 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 287 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 288 } 289 290 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 291 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 292 } 293 294 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 295 return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, 296 getStubKey(AdminService.getDescriptor().getName(), serverName), 297 () -> createAdminServerStub(serverName)); 298 } 299 300 CompletableFuture<MasterService.Interface> getMasterStub() { 301 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { 302 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); 303 addListener(registry.getActiveMaster(), (addr, error) -> { 304 if (error != null) { 305 future.completeExceptionally(error); 306 } else if (addr == null) { 307 future.completeExceptionally(new MasterNotRunningException( 308 "ZooKeeper available but no active master location found")); 309 } else { 310 LOG.debug("The fetched master address is {}", addr); 311 try { 312 future.complete(createMasterStub(addr)); 313 } catch (IOException e) { 314 future.completeExceptionally(e); 315 } 316 } 317 318 }); 319 return future; 320 }, stub -> true, "master stub"); 321 } 322 323 String getClusterId() { 324 try { 325 return registry.getClusterId().get(); 326 } catch (InterruptedException | ExecutionException e) { 327 LOG.error("Error fetching cluster ID: ", e); 328 } 329 return null; 330 } 331 332 void clearMasterStubCache(MasterService.Interface stub) { 333 masterStub.compareAndSet(stub, null); 334 } 335 336 Optional<ServerStatisticTracker> getStatisticsTracker() { 337 return stats; 338 } 339 340 ClientBackoffPolicy getBackoffPolicy() { 341 return backoffPolicy; 342 } 343 344 @Override 345 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 346 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 347 348 @Override 349 public AsyncTable<AdvancedScanResultConsumer> build() { 350 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 351 } 352 }; 353 } 354 355 @Override 356 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 357 ExecutorService pool) { 358 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 359 360 @Override 361 public AsyncTable<ScanResultConsumer> build() { 362 RawAsyncTableImpl rawTable = 363 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 364 return new AsyncTableImpl(rawTable, pool); 365 } 366 }; 367 } 368 369 @Override 370 public AsyncAdminBuilder getAdminBuilder() { 371 return new AsyncAdminBuilderBase(connConf) { 372 @Override 373 public AsyncAdmin build() { 374 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 375 } 376 }; 377 } 378 379 @Override 380 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 381 return new AsyncAdminBuilderBase(connConf) { 382 @Override 383 public AsyncAdmin build() { 384 RawAsyncHBaseAdmin rawAdmin = 385 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 386 return new AsyncHBaseAdmin(rawAdmin, pool); 387 } 388 }; 389 } 390 391 @Override 392 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 393 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 394 } 395 396 @Override 397 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 398 ExecutorService pool) { 399 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 400 RETRY_TIMER); 401 } 402 403 private Hbck getHbckInternal(ServerName masterServer) { 404 Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName()); 405 // we will not create a new connection when creating a new protobuf stub, and for hbck there 406 // will be no performance consideration, so for simplification we will create a new stub every 407 // time instead of caching the stub here. 408 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 409 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 410 } 411 412 @Override 413 public CompletableFuture<Hbck> getHbck() { 414 return TraceUtil.tracedFuture(() -> { 415 CompletableFuture<Hbck> future = new CompletableFuture<>(); 416 addListener(registry.getActiveMaster(), (sn, error) -> { 417 if (error != null) { 418 future.completeExceptionally(error); 419 } else { 420 future.complete(getHbckInternal(sn)); 421 } 422 }); 423 return future; 424 }, "AsyncConnection.getHbck"); 425 } 426 427 @Override 428 public Hbck getHbck(ServerName masterServer) { 429 return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck"); 430 } 431 432 @Override 433 public void clearRegionLocationCache() { 434 locator.clearCache(); 435 } 436 437 Optional<MetricsConnection> getConnectionMetrics() { 438 return metrics; 439 } 440}