001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 022import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 028import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 029import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx; 030 031import edu.umd.cs.findbugs.annotations.Nullable; 032import io.opentelemetry.api.trace.Span; 033import io.opentelemetry.context.Scope; 034import java.io.Closeable; 035import java.io.IOException; 036import java.io.InterruptedIOException; 037import java.lang.reflect.UndeclaredThrowableException; 038import java.util.ArrayList; 039import java.util.Collections; 040import java.util.Date; 041import java.util.List; 042import java.util.Map; 043import java.util.concurrent.BlockingQueue; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ConcurrentHashMap; 046import java.util.concurrent.ConcurrentMap; 047import java.util.concurrent.ExecutionException; 048import java.util.concurrent.ExecutorService; 049import java.util.concurrent.LinkedBlockingQueue; 050import java.util.concurrent.ThreadPoolExecutor; 051import java.util.concurrent.TimeUnit; 052import java.util.concurrent.atomic.AtomicInteger; 053import java.util.concurrent.locks.ReentrantLock; 054import java.util.function.Supplier; 055import java.util.stream.Collectors; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.hbase.AuthUtil; 058import org.apache.hadoop.hbase.CatalogReplicaMode; 059import org.apache.hadoop.hbase.ChoreService; 060import org.apache.hadoop.hbase.DoNotRetryIOException; 061import org.apache.hadoop.hbase.HBaseServerException; 062import org.apache.hadoop.hbase.HConstants; 063import org.apache.hadoop.hbase.HRegionLocation; 064import org.apache.hadoop.hbase.MasterNotRunningException; 065import org.apache.hadoop.hbase.MetaTableAccessor; 066import org.apache.hadoop.hbase.RegionLocations; 067import org.apache.hadoop.hbase.ServerName; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.TableNotEnabledException; 070import org.apache.hadoop.hbase.TableNotFoundException; 071import org.apache.hadoop.hbase.ZooKeeperConnectionException; 072import org.apache.hadoop.hbase.client.Scan.ReadType; 073import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 074import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 075import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 076import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 077import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 078import org.apache.hadoop.hbase.exceptions.RegionMovedException; 079import org.apache.hadoop.hbase.ipc.RpcClient; 080import org.apache.hadoop.hbase.ipc.RpcClientFactory; 081import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 082import org.apache.hadoop.hbase.log.HBaseMarkers; 083import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 084import org.apache.hadoop.hbase.security.User; 085import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 086import org.apache.hadoop.hbase.trace.TraceUtil; 087import org.apache.hadoop.hbase.util.Bytes; 088import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 089import org.apache.hadoop.hbase.util.ExceptionUtil; 090import org.apache.hadoop.hbase.util.Pair; 091import org.apache.hadoop.hbase.util.ReflectionUtils; 092import org.apache.hadoop.hbase.util.Threads; 093import org.apache.hadoop.ipc.RemoteException; 094import org.apache.hadoop.security.UserGroupInformation; 095import org.apache.yetus.audience.InterfaceAudience; 096import org.apache.zookeeper.KeeperException; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 101import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 102import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 103import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 104import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 105import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 106 107import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 108import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.BootstrapNodeService; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos.GetAllBootstrapNodesResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersRequest; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLiveRegionServersResponse; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 161import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 162import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 163import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 164import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 165import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; 166import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; 167import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; 168import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; 169import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; 170import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; 171import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 172import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 173import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 174import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 175import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; 176import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; 177import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 178import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 179 180/** 181 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. Encapsulates 182 * connection to zookeeper and regionservers. 183 */ 184@edu.umd.cs.findbugs.annotations.SuppressWarnings( 185 value = "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 186 justification = "Access to the conncurrent hash map is under a lock so should be fine.") 187@InterfaceAudience.Private 188public class ConnectionImplementation implements ClusterConnection, Closeable { 189 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; 190 191 public static final String MASTER_STATE_CACHE_TIMEOUT_SEC = 192 "hbase.client.master.state.cache.timeout.sec"; 193 private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); 194 195 // The mode tells if HedgedRead, LoadBalance mode is supported. 196 // The default mode is CatalogReplicaMode.None. 197 private CatalogReplicaMode metaReplicaMode; 198 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 199 200 private final int metaReplicaCallTimeoutScanInMicroSecond; 201 private final int numTries; 202 final int rpcTimeout; 203 204 /** 205 * Global nonceGenerator shared per client. Currently there's no reason to limit its scope. Once 206 * it's set under nonceGeneratorCreateLock, it is never unset or changed. 207 */ 208 // XXX: It is a bad pattern to assign a value to a static field from a constructor. However 209 // it would likely change semantics if we change it because the NonceGenerator is selected 210 // from configuration passed in as a parameter of the constructor. This has been cleaned up 211 // in later branches. 212 private static volatile NonceGenerator nonceGenerator = null; 213 /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */ 214 private static final Object nonceGeneratorCreateLock = new Object(); 215 216 private final AsyncProcess asyncProcess; 217 // single tracker per connection 218 private final ServerStatisticTracker stats; 219 220 private volatile boolean closed; 221 private volatile boolean aborted; 222 223 // package protected for the tests 224 ClusterStatusListener clusterStatusListener; 225 226 private final Object metaRegionLock = new Object(); 227 228 private final Object masterLock = new Object(); 229 230 // thread executor shared by all Table instances created 231 // by this connection 232 private volatile ThreadPoolExecutor batchPool = null; 233 // meta thread executor shared by all Table instances created 234 // by this connection 235 private volatile ThreadPoolExecutor metaLookupPool = null; 236 private volatile boolean cleanupPool = false; 237 238 private final Configuration conf; 239 240 // cache the configuration value for tables so that we can avoid calling 241 // the expensive Configuration to fetch the value multiple times. 242 private final ConnectionConfiguration connectionConfig; 243 244 // Client rpc instance. 245 private final RpcClient rpcClient; 246 247 private final MetaCache metaCache; 248 249 private String metricsScope = null; 250 private final MetricsConnection metrics; 251 252 protected User user; 253 254 private final RpcRetryingCallerFactory rpcCallerFactory; 255 256 private final RpcControllerFactory rpcControllerFactory; 257 258 private final RetryingCallerInterceptor interceptor; 259 260 /** 261 * Cluster registry of basic info such as clusterid and meta region location. 262 */ 263 private final ConnectionRegistry registry; 264 265 private final ClientBackoffPolicy backoffPolicy; 266 267 /** 268 * Allow setting an alternate BufferedMutator implementation via config. If null, use default. 269 */ 270 private final String alternateBufferedMutatorClassName; 271 272 /** lock guards against multiple threads trying to query the meta region at the same time */ 273 private final ReentrantLock userRegionLock = new ReentrantLock(); 274 275 /** 276 * Supplier to get masterState.By default uses simple supplier without TTL cache. When 277 * hbase.client.master.state.cache.timeout.sec > 0 it uses TTL Cache. 278 */ 279 private final Supplier<Boolean> masterStateSupplier; 280 281 private ChoreService choreService; 282 283 /** 284 * constructor 285 * @param conf Configuration object 286 */ 287 ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { 288 this(conf, pool, user, null, Collections.emptyMap()); 289 } 290 291 /** 292 * constructor 293 * @param conf Configuration object 294 */ 295 ConnectionImplementation(Configuration conf, ExecutorService pool, User user, 296 Map<String, byte[]> connectionAttributes) throws IOException { 297 this(conf, pool, user, null, connectionAttributes); 298 } 299 300 /** 301 * Constructor, for creating cluster connection with provided ConnectionRegistry. 302 */ 303 ConnectionImplementation(Configuration conf, ExecutorService pool, User user, 304 ConnectionRegistry registry) throws IOException { 305 this(conf, pool, user, registry, Collections.emptyMap()); 306 } 307 308 /** 309 * Constructor, for creating cluster connection with provided ConnectionRegistry. 310 */ 311 ConnectionImplementation(Configuration conf, ExecutorService pool, User user, 312 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 313 this.conf = conf; 314 this.user = user; 315 if (user != null && user.isLoginFromKeytab()) { 316 spawnRenewalChore(user.getUGI()); 317 } 318 this.batchPool = (ThreadPoolExecutor) pool; 319 this.connectionConfig = new ConnectionConfiguration(conf); 320 this.closed = false; 321 this.metaReplicaCallTimeoutScanInMicroSecond = 322 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); 323 324 // how many times to try, one more than max *retry* time 325 this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); 326 this.rpcTimeout = 327 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 328 if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { 329 synchronized (nonceGeneratorCreateLock) { 330 if (nonceGenerator == null) { 331 nonceGenerator = PerClientRandomNonceGenerator.get(); 332 } 333 } 334 } else { 335 nonceGenerator = NO_NONCE_GENERATOR; 336 } 337 338 this.stats = ServerStatisticTracker.create(conf); 339 this.interceptor = new RetryingCallerInterceptorFactory(conf).build(); 340 341 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 342 343 boolean shouldListen = 344 conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); 345 Class<? extends ClusterStatusListener.Listener> listenerClass = 346 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, 347 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 348 349 // Is there an alternate BufferedMutator to use? 350 this.alternateBufferedMutatorClassName = this.conf.get(BufferedMutator.CLASSNAME_KEY); 351 352 try { 353 if (registry == null) { 354 this.registry = ConnectionRegistryFactory.getRegistry(conf, user); 355 } else { 356 this.registry = registry; 357 } 358 retrieveClusterId(); 359 360 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 361 this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); 362 this.metrics = MetricsConnection.getMetricsConnection(conf, this.metricsScope, 363 this::getBatchPool, this::getMetaLookupPool); 364 } else { 365 this.metrics = null; 366 } 367 this.metaCache = new MetaCache(this.metrics); 368 369 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics, 370 connectionAttributes); 371 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 372 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, connectionConfig, 373 interceptor, this.stats, this.metrics); 374 this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); 375 376 // Do we publish the status? 377 if (shouldListen) { 378 if (listenerClass == null) { 379 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " 380 + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); 381 } else { 382 clusterStatusListener = 383 new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { 384 @Override 385 public void newDead(ServerName sn) { 386 clearCaches(sn); 387 rpcClient.cancelConnections(sn); 388 } 389 }, conf, listenerClass); 390 } 391 } 392 } catch (Throwable e) { 393 // avoid leaks: registry, rpcClient, ... 394 LOG.debug("connection construction failed", e); 395 close(); 396 throw e; 397 } 398 399 // Get the region locator's meta replica mode. 400 this.metaReplicaMode = CatalogReplicaMode 401 .fromString(conf.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 402 403 switch (this.metaReplicaMode) { 404 case LOAD_BALANCE: 405 String replicaSelectorClass = 406 conf.get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 407 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 408 409 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory 410 .createSelector(replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> { 411 int numOfReplicas = 1; 412 try { 413 RegionLocations metaLocations = this.registry.getMetaRegionLocations() 414 .get(connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS); 415 numOfReplicas = metaLocations.size(); 416 } catch (Exception e) { 417 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 418 } 419 return numOfReplicas; 420 }); 421 break; 422 case NONE: 423 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 424 425 boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 426 if (useMetaReplicas) { 427 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 428 } 429 break; 430 default: 431 // Doing nothing 432 } 433 434 long masterStateCacheTimeout = conf.getLong(MASTER_STATE_CACHE_TIMEOUT_SEC, 0); 435 436 Supplier<Boolean> masterConnSupplier = masterConnectionStateSupplier(); 437 if (masterStateCacheTimeout <= 0L) { 438 this.masterStateSupplier = masterConnSupplier; 439 } else { 440 this.masterStateSupplier = Suppliers.memoizeWithExpiration(masterConnSupplier::get, 441 masterStateCacheTimeout, TimeUnit.SECONDS); 442 } 443 } 444 445 /** 446 * Visible for tests 447 */ 448 Supplier<Boolean> masterConnectionStateSupplier() { 449 return () -> { 450 if (this.masterServiceState.getStub() == null) { 451 return false; 452 } 453 try { 454 LOG.trace("Getting master state using rpc call"); 455 return this.masterServiceState.isMasterRunning(); 456 } catch (UndeclaredThrowableException e) { 457 // It's somehow messy, but we can receive exceptions such as 458 // java.net.ConnectException but they're not declared. So we catch it... 459 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); 460 return false; 461 } catch (IOException se) { 462 LOG.warn("Checking master connection", se); 463 return false; 464 } 465 }; 466 } 467 468 private void spawnRenewalChore(final UserGroupInformation user) { 469 ChoreService service = getChoreService(); 470 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf)); 471 } 472 473 /** 474 * @param conn The connection for which to replace the generator. 475 * @param cnm Replaces the nonce generator used, for testing. 476 * @return old nonce generator. 477 */ 478 static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn, NonceGenerator cnm) { 479 ConnectionImplementation connImpl = (ConnectionImplementation) conn; 480 NonceGenerator ng = connImpl.getNonceGenerator(); 481 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName()); 482 nonceGenerator = cnm; 483 return ng; 484 } 485 486 @Override 487 public Table getTable(TableName tableName) throws IOException { 488 return getTable(tableName, getBatchPool()); 489 } 490 491 @Override 492 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 493 return new TableBuilderBase(tableName, connectionConfig) { 494 495 @Override 496 public Table build() { 497 return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, 498 rpcControllerFactory, pool, requestAttributes); 499 } 500 }; 501 } 502 503 @Override 504 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { 505 if (params.getTableName() == null) { 506 throw new IllegalArgumentException("TableName cannot be null."); 507 } 508 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { 509 params.writeBufferSize(connectionConfig.getWriteBufferSize()); 510 } 511 if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { 512 params.setWriteBufferPeriodicFlushTimeoutMs( 513 connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); 514 } 515 if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { 516 params.setWriteBufferPeriodicFlushTimerTickMs( 517 connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); 518 } 519 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { 520 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); 521 } 522 // Look to see if an alternate BufferedMutation implementation is wanted. 523 // Look in params and in config. If null, use default. 524 String implementationClassName = params.getImplementationClassName(); 525 if (implementationClassName == null) { 526 implementationClassName = this.alternateBufferedMutatorClassName; 527 } 528 if (implementationClassName == null) { 529 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); 530 } 531 try { 532 return (BufferedMutator) ReflectionUtils.newInstance(Class.forName(implementationClassName), 533 this, rpcCallerFactory, rpcControllerFactory, params); 534 } catch (ClassNotFoundException e) { 535 throw new RuntimeException(e); 536 } 537 } 538 539 @Override 540 public BufferedMutator getBufferedMutator(TableName tableName) { 541 return getBufferedMutator(new BufferedMutatorParams(tableName)); 542 } 543 544 @Override 545 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 546 return new HRegionLocator(tableName, this); 547 } 548 549 @Override 550 public Admin getAdmin() throws IOException { 551 return new HBaseAdmin(this); 552 } 553 554 @Override 555 public Hbck getHbck() throws IOException { 556 return TraceUtil.trace(() -> getHbck(get(registry.getActiveMaster())), 557 () -> TraceUtil.createSpan(this.getClass().getSimpleName() + ".getHbck")); 558 } 559 560 @Override 561 public Hbck getHbck(ServerName masterServer) throws IOException { 562 return TraceUtil.trace(() -> { 563 checkClosed(); 564 if (isDeadServer(masterServer)) { 565 throw new RegionServerStoppedException(masterServer + " is dead."); 566 } 567 String key = 568 getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), masterServer); 569 570 return new HBaseHbck( 571 (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 572 BlockingRpcChannel channel = 573 this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); 574 return MasterProtos.HbckService.newBlockingStub(channel); 575 }), rpcControllerFactory); 576 }, () -> TraceUtil.createSpan(this.getClass().getSimpleName() + ".getHbck") 577 .setAttribute(HBaseSemanticAttributes.SERVER_NAME_KEY, masterServer.getServerName())); 578 } 579 580 @Override 581 public MetricsConnection getConnectionMetrics() { 582 return this.metrics; 583 } 584 585 @Override 586 public User getUser() { 587 return user; 588 } 589 590 @Override 591 public ConnectionRegistry getConnectionRegistry() { 592 return registry; 593 } 594 595 private ThreadPoolExecutor getBatchPool() { 596 if (batchPool == null) { 597 synchronized (this) { 598 if (batchPool == null) { 599 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 600 this.batchPool = getThreadPool(threads, threads, "-shared", null); 601 this.cleanupPool = true; 602 } 603 } 604 } 605 return this.batchPool; 606 } 607 608 private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint, 609 BlockingQueue<Runnable> passedWorkQueue) { 610 // shared HTable thread executor not yet initialized 611 if (maxThreads == 0) { 612 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 613 } 614 if (coreThreads == 0) { 615 coreThreads = Runtime.getRuntime().availableProcessors() * 8; 616 } 617 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 618 BlockingQueue<Runnable> workQueue = passedWorkQueue; 619 if (workQueue == null) { 620 workQueue = 621 new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 622 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 623 coreThreads = maxThreads; 624 } 625 ThreadPoolExecutor tpe = 626 new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 627 new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + nameHint + "-pool-%d") 628 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 629 tpe.allowCoreThreadTimeOut(true); 630 return tpe; 631 } 632 633 private ThreadPoolExecutor getMetaLookupPool() { 634 if (this.metaLookupPool == null) { 635 synchronized (this) { 636 if (this.metaLookupPool == null) { 637 // Some of the threads would be used for meta replicas 638 // To start with, threads.max.core threads can hit the meta (including replicas). 639 // After that, requests will get queued up in the passed queue, and only after 640 // the queue is full, a new thread will be started 641 int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); 642 this.metaLookupPool = 643 getThreadPool(threads, threads, "-metaLookup-shared-", new LinkedBlockingQueue<>()); 644 } 645 } 646 } 647 return this.metaLookupPool; 648 } 649 650 protected ExecutorService getCurrentMetaLookupPool() { 651 return metaLookupPool; 652 } 653 654 protected ExecutorService getCurrentBatchPool() { 655 return batchPool; 656 } 657 658 private void shutdownPools() { 659 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { 660 shutdownBatchPool(this.batchPool); 661 } 662 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { 663 shutdownBatchPool(this.metaLookupPool); 664 } 665 } 666 667 private void shutdownBatchPool(ExecutorService pool) { 668 pool.shutdown(); 669 try { 670 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 671 pool.shutdownNow(); 672 } 673 } catch (InterruptedException e) { 674 pool.shutdownNow(); 675 } 676 } 677 678 /** 679 * For tests only. 680 */ 681 RpcClient getRpcClient() { 682 return rpcClient; 683 } 684 685 /** 686 * An identifier that will remain the same for a given connection. 687 */ 688 @Override 689 public String toString() { 690 return "hconnection-0x" + Integer.toHexString(hashCode()); 691 } 692 693 protected String clusterId = null; 694 695 protected void retrieveClusterId() { 696 if (clusterId != null) { 697 return; 698 } 699 try { 700 this.clusterId = this.registry.getClusterId().get(); 701 } catch (InterruptedException | ExecutionException e) { 702 LOG.warn("Retrieve cluster id failed", e); 703 } 704 if (clusterId == null) { 705 clusterId = HConstants.CLUSTER_ID_DEFAULT; 706 LOG.debug("clusterid came back null, using default " + clusterId); 707 } 708 } 709 710 /** 711 * If choreService has not been created yet, create the ChoreService. 712 */ 713 synchronized ChoreService getChoreService() { 714 if (choreService == null) { 715 choreService = new ChoreService("AsyncConn Chore Service"); 716 } 717 return choreService; 718 } 719 720 @Override 721 public Configuration getConfiguration() { 722 return this.conf; 723 } 724 725 private void checkClosed() throws LocalConnectionClosedException { 726 if (this.closed) { 727 throw new LocalConnectionClosedException(toString() + " closed"); 728 } 729 } 730 731 /** 732 * Like {@link ConnectionClosedException} but thrown from the checkClosed call which looks at the 733 * local this.closed flag. We use this rather than {@link ConnectionClosedException} because the 734 * latter does not inherit from DoNotRetryIOE (it should. TODO). 735 */ 736 private static class LocalConnectionClosedException extends DoNotRetryIOException { 737 LocalConnectionClosedException(String message) { 738 super(message); 739 } 740 } 741 742 /** 743 * @return true if the master is running, throws an exception otherwise 744 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running 745 * @deprecated this has been deprecated without a replacement 746 */ 747 @Deprecated 748 @Override 749 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { 750 // When getting the master connection, we check it's running, 751 // so if there is no exception, it means we've been able to get a 752 // connection on a running master 753 MasterKeepAliveConnection m; 754 try { 755 m = getKeepAliveMasterService(); 756 } catch (IOException e) { 757 throw new MasterNotRunningException(e); 758 } 759 m.close(); 760 return true; 761 } 762 763 @Override 764 public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, 765 boolean reload) throws IOException { 766 return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); 767 } 768 769 @Override 770 public boolean isTableEnabled(TableName tableName) throws IOException { 771 return getTableState(tableName).inStates(TableState.State.ENABLED); 772 } 773 774 @Override 775 public boolean isTableDisabled(TableName tableName) throws IOException { 776 return getTableState(tableName).inStates(TableState.State.DISABLED); 777 } 778 779 @Override 780 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) 781 throws IOException { 782 checkClosed(); 783 try { 784 if (!isTableEnabled(tableName)) { 785 LOG.debug("Table {} not enabled", tableName); 786 return false; 787 } 788 if (TableName.isMetaTableName(tableName)) { 789 // meta table is always available 790 return true; 791 } 792 List<Pair<RegionInfo, ServerName>> locations = 793 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); 794 795 int notDeployed = 0; 796 int regionCount = 0; 797 for (Pair<RegionInfo, ServerName> pair : locations) { 798 RegionInfo info = pair.getFirst(); 799 if (pair.getSecond() == null) { 800 LOG.debug("Table {} has not deployed region {}", tableName, 801 pair.getFirst().getEncodedName()); 802 notDeployed++; 803 } else 804 if (splitKeys != null && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 805 for (byte[] splitKey : splitKeys) { 806 // Just check if the splitkey is available 807 if (Bytes.equals(info.getStartKey(), splitKey)) { 808 regionCount++; 809 break; 810 } 811 } 812 } else { 813 // Always empty start row should be counted 814 regionCount++; 815 } 816 } 817 if (notDeployed > 0) { 818 if (LOG.isDebugEnabled()) { 819 LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed); 820 } 821 return false; 822 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { 823 if (LOG.isDebugEnabled()) { 824 LOG.debug("Table {} expected to have {} regions, but only {} available", tableName, 825 splitKeys.length + 1, regionCount); 826 } 827 return false; 828 } else { 829 LOG.trace("Table {} should be available", tableName); 830 return true; 831 } 832 } catch (TableNotFoundException tnfe) { 833 LOG.warn("Table {} does not exist", tableName); 834 return false; 835 } 836 } 837 838 @Override 839 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { 840 RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), 841 RegionInfo.getStartKey(regionName), false, true); 842 return locations == null ? null : locations.getRegionLocation(); 843 } 844 845 private boolean isDeadServer(ServerName sn) { 846 if (clusterStatusListener == null) { 847 return false; 848 } else { 849 return clusterStatusListener.isDeadServer(sn); 850 } 851 } 852 853 @Override 854 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException { 855 return locateRegions(tableName, false, true); 856 } 857 858 @Override 859 public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache, 860 boolean offlined) throws IOException { 861 List<RegionInfo> regions; 862 if (TableName.isMetaTableName(tableName)) { 863 regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO); 864 } else { 865 regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined); 866 } 867 List<HRegionLocation> locations = new ArrayList<>(); 868 for (RegionInfo regionInfo : regions) { 869 if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) { 870 continue; 871 } 872 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); 873 if (list != null) { 874 for (HRegionLocation loc : list.getRegionLocations()) { 875 if (loc != null) { 876 locations.add(loc); 877 } 878 } 879 } 880 } 881 return locations; 882 } 883 884 @Override 885 public HRegionLocation locateRegion(final TableName tableName, final byte[] row) 886 throws IOException { 887 RegionLocations locations = locateRegion(tableName, row, true, true); 888 return locations == null ? null : locations.getRegionLocation(); 889 } 890 891 @Override 892 public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) 893 throws IOException { 894 RegionLocations locations = 895 relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 896 return locations == null 897 ? null 898 : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); 899 } 900 901 @Override 902 public RegionLocations relocateRegion(final TableName tableName, final byte[] row, int replicaId) 903 throws IOException { 904 // Since this is an explicit request not to use any caching, finding 905 // disabled tables should not be desirable. This will ensure that an exception is thrown when 906 // the first time a disabled table is interacted with. 907 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { 908 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 909 } 910 911 return locateRegion(tableName, row, false, true, replicaId); 912 } 913 914 @Override 915 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 916 boolean retry) throws IOException { 917 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); 918 } 919 920 @Override 921 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 922 boolean retry, int replicaId) throws IOException { 923 checkClosed(); 924 if (tableName == null || tableName.getName().length == 0) { 925 throw new IllegalArgumentException("table name cannot be null or zero length"); 926 } 927 if (tableName.equals(TableName.META_TABLE_NAME)) { 928 return locateMeta(tableName, useCache, replicaId); 929 } else { 930 // Region not in the cache - have to go to the meta RS 931 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); 932 } 933 } 934 935 private RegionLocations locateMeta(final TableName tableName, boolean useCache, int replicaId) 936 throws IOException { 937 // HBASE-10785: We cache the location of the META itself, so that we are not overloading 938 // zookeeper with one request for every region lookup. We cache the META with empty row 939 // key in MetaCache. 940 byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta 941 RegionLocations locations = null; 942 if (useCache) { 943 locations = getCachedLocation(tableName, metaCacheKey); 944 if (locations != null && locations.getRegionLocation(replicaId) != null) { 945 return locations; 946 } 947 } 948 949 // only one thread should do the lookup. 950 synchronized (metaRegionLock) { 951 // Check the cache again for a hit in case some other thread made the 952 // same query while we were waiting on the lock. 953 if (useCache) { 954 locations = getCachedLocation(tableName, metaCacheKey); 955 if (locations != null && locations.getRegionLocation(replicaId) != null) { 956 return locations; 957 } 958 } 959 960 // Look up from zookeeper 961 locations = get(this.registry.getMetaRegionLocations()); 962 if (locations != null) { 963 cacheLocation(tableName, locations); 964 } 965 } 966 return locations; 967 } 968 969 /** 970 * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're 971 * seeking. 972 */ 973 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, 974 boolean retry, int replicaId) throws IOException { 975 // If we are supposed to be using the cache, look in the cache to see if we already have the 976 // region. 977 if (useCache) { 978 RegionLocations locations = getCachedLocation(tableName, row); 979 if (locations != null && locations.getRegionLocation(replicaId) != null) { 980 return locations; 981 } 982 } 983 // build the key of the meta region we should be looking for. 984 // the extra 9's on the end are necessary to allow "exact" matches 985 // without knowing the precise region names. 986 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 987 byte[] metaStopKey = 988 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 989 Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 990 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(1) 991 .setReadType(ReadType.PREAD); 992 993 switch (this.metaReplicaMode) { 994 case LOAD_BALANCE: 995 int metaReplicaId = 996 this.metaReplicaSelector.select(tableName, row, RegionLocateType.CURRENT); 997 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 998 // If the selector gives a non-primary meta replica region, then go with it. 999 // Otherwise, just go to primary in non-hedgedRead mode. 1000 s.setConsistency(Consistency.TIMELINE); 1001 s.setReplicaId(metaReplicaId); 1002 } 1003 break; 1004 case HEDGED_READ: 1005 s.setConsistency(Consistency.TIMELINE); 1006 break; 1007 default: 1008 // do nothing 1009 } 1010 int maxAttempts = (retry ? numTries : 1); 1011 boolean relocateMeta = false; 1012 for (int tries = 0;; tries++) { 1013 if (tries >= maxAttempts) { 1014 throw new NoServerForRegionException("Unable to find region for " 1015 + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries."); 1016 } 1017 if (useCache) { 1018 RegionLocations locations = getCachedLocation(tableName, row); 1019 if (locations != null && locations.getRegionLocation(replicaId) != null) { 1020 return locations; 1021 } 1022 } else { 1023 // If we are not supposed to be using the cache, delete any existing cached location 1024 // so it won't interfere. 1025 // We are only supposed to clean the cache for the specific replicaId 1026 metaCache.clearCache(tableName, row, replicaId); 1027 } 1028 // Query the meta region 1029 long pauseBase = connectionConfig.getPauseMillis(); 1030 long lockStartTime = 0; 1031 boolean lockedUserRegion = false; 1032 try { 1033 takeUserRegionLock(); 1034 lockStartTime = EnvironmentEdgeManager.currentTime(); 1035 lockedUserRegion = true; 1036 // We don't need to check if useCache is enabled or not. Even if useCache is false 1037 // we already cleared the cache for this row before acquiring userRegion lock so if this 1038 // row is present in cache that means some other thread has populated it while we were 1039 // waiting to acquire user region lock. 1040 RegionLocations locations = getCachedLocation(tableName, row); 1041 if (locations != null && locations.getRegionLocation(replicaId) != null) { 1042 return locations; 1043 } 1044 if (relocateMeta) { 1045 relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, 1046 RegionInfo.DEFAULT_REPLICA_ID); 1047 } 1048 s.resetMvccReadPoint(); 1049 final Span span = new TableOperationSpanBuilder(this) 1050 .setTableName(TableName.META_TABLE_NAME).setOperation(s).build(); 1051 try (Scope ignored = span.makeCurrent(); 1052 ReversedClientScanner rcs = 1053 new ReversedClientScanner(conf, s, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, 1054 rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), 1055 connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond, 1056 connectionConfig, Collections.emptyMap())) { 1057 boolean tableNotFound = true; 1058 for (;;) { 1059 Result regionInfoRow = rcs.next(); 1060 if (regionInfoRow == null) { 1061 if (tableNotFound) { 1062 throw new TableNotFoundException(tableName); 1063 } else { 1064 throw new IOException( 1065 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 1066 } 1067 } 1068 tableNotFound = false; 1069 // convert the row result into the HRegionLocation we need! 1070 locations = MetaTableAccessor.getRegionLocations(regionInfoRow); 1071 if (locations == null || locations.getRegionLocation(replicaId) == null) { 1072 throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); 1073 } 1074 RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion(); 1075 if (regionInfo == null) { 1076 throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME 1077 + ", row=" + regionInfoRow); 1078 } 1079 // See HBASE-20182. It is possible that we locate to a split parent even after the 1080 // children are online, so here we need to skip this region and go to the next one. 1081 if (regionInfo.isSplitParent()) { 1082 continue; 1083 } 1084 if (regionInfo.isOffline()) { 1085 throw new RegionOfflineException( 1086 "Region offline; disable table call? " + regionInfo.getRegionNameAsString()); 1087 } 1088 // It is possible that the split children have not been online yet and we have skipped 1089 // the parent in the above condition, so we may have already reached a region which does 1090 // not contains us. 1091 if (!regionInfo.containsRow(row)) { 1092 throw new IOException( 1093 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 1094 } 1095 ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); 1096 if (serverName == null) { 1097 throw new NoServerForRegionException("No server address listed in " 1098 + TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() 1099 + " containing row " + Bytes.toStringBinary(row)); 1100 } 1101 if (isDeadServer(serverName)) { 1102 throw new RegionServerStoppedException( 1103 "hbase:meta says the region " + regionInfo.getRegionNameAsString() 1104 + " is managed by the server " + serverName + ", but it is dead."); 1105 } 1106 // Instantiate the location 1107 cacheLocation(tableName, locations); 1108 return locations; 1109 } 1110 } 1111 } catch (TableNotFoundException e) { 1112 // if we got this error, probably means the table just plain doesn't 1113 // exist. rethrow the error immediately. this should always be coming 1114 // from the HTable constructor. 1115 throw e; 1116 } catch (LocalConnectionClosedException cce) { 1117 // LocalConnectionClosedException is specialized instance of DoNotRetryIOE. 1118 // Thrown when we check if this connection is closed. If it is, don't retry. 1119 throw cce; 1120 } catch (IOException e) { 1121 ExceptionUtil.rethrowIfInterrupt(e); 1122 if (e instanceof RemoteException) { 1123 e = ((RemoteException) e).unwrapRemoteException(); 1124 } 1125 if (HBaseServerException.isServerOverloaded(e)) { 1126 // Give a special pause when encountering an exception indicating the server 1127 // is overloaded. see #HBASE-17114 and HBASE-26807 1128 pauseBase = connectionConfig.getPauseMillisForServerOverloaded(); 1129 } 1130 if (tries < maxAttempts - 1) { 1131 LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " 1132 + "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e); 1133 } else { 1134 throw e; 1135 } 1136 // Only relocate the parent region if necessary 1137 relocateMeta = 1138 !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); 1139 1140 if (metrics != null && HBaseServerException.isServerOverloaded(e)) { 1141 metrics.incrementServerOverloadedBackoffTime( 1142 ConnectionUtils.getPauseTime(pauseBase, tries), TimeUnit.MILLISECONDS); 1143 } 1144 } finally { 1145 if (lockedUserRegion) { 1146 userRegionLock.unlock(); 1147 // update duration of the lock being held 1148 if (metrics != null) { 1149 metrics.updateUserRegionLockHeld(EnvironmentEdgeManager.currentTime() - lockStartTime); 1150 } 1151 } 1152 } 1153 try { 1154 Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); 1155 } catch (InterruptedException e) { 1156 throw new InterruptedIOException( 1157 "Giving up trying to location region in " + "meta: thread is interrupted."); 1158 } 1159 } 1160 } 1161 1162 void takeUserRegionLock() throws IOException { 1163 try { 1164 long waitTime = connectionConfig.getMetaOperationTimeout(); 1165 if (metrics != null) { 1166 metrics.updateUserRegionLockQueue(userRegionLock.getQueueLength()); 1167 } 1168 final long waitStartTime = EnvironmentEdgeManager.currentTime(); 1169 if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { 1170 if (metrics != null) { 1171 metrics.incrUserRegionLockTimeout(); 1172 } 1173 throw new LockTimeoutException("Failed to get user region lock in" + waitTime + " ms. " 1174 + " for accessing meta region server."); 1175 } else if (metrics != null) { 1176 // successfully grabbed the lock, start timer of holding the lock 1177 metrics.updateUserRegionLockWaiting(EnvironmentEdgeManager.currentTime() - waitStartTime); 1178 } 1179 } catch (InterruptedException ie) { 1180 LOG.error("Interrupted while waiting for a lock", ie); 1181 throw ExceptionUtil.asInterrupt(ie); 1182 } 1183 } 1184 1185 /** 1186 * Put a newly discovered HRegionLocation into the cache. 1187 * @param tableName The table name. 1188 * @param location the new location 1189 */ 1190 @Override 1191 public void cacheLocation(final TableName tableName, final RegionLocations location) { 1192 metaCache.cacheLocation(tableName, location); 1193 } 1194 1195 /** 1196 * Search the cache for a location that fits our table and row key. Return null if no suitable 1197 * region is located. 1198 * @return Null or region location found in cache. 1199 */ 1200 RegionLocations getCachedLocation(final TableName tableName, final byte[] row) { 1201 return metaCache.getCachedLocation(tableName, row); 1202 } 1203 1204 public void clearRegionCache(final TableName tableName, byte[] row) { 1205 metaCache.clearCache(tableName, row); 1206 } 1207 1208 /* 1209 * Delete all cached entries of a table that maps to a specific location. 1210 */ 1211 @Override 1212 public void clearCaches(final ServerName serverName) { 1213 metaCache.clearCache(serverName); 1214 } 1215 1216 @Override 1217 public void clearRegionLocationCache() { 1218 metaCache.clearCache(); 1219 } 1220 1221 @Override 1222 public void clearRegionCache(final TableName tableName) { 1223 metaCache.clearCache(tableName); 1224 } 1225 1226 /** 1227 * Put a newly discovered HRegionLocation into the cache. 1228 * @param tableName The table name. 1229 * @param source the source of the new location, if it's not coming from meta 1230 * @param location the new location 1231 */ 1232 private void cacheLocation(final TableName tableName, final ServerName source, 1233 final HRegionLocation location) { 1234 metaCache.cacheLocation(tableName, source, location); 1235 } 1236 1237 // Map keyed by service name + regionserver to service stub implementation 1238 private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>(); 1239 1240 /** 1241 * State of the MasterService connection/setup. 1242 */ 1243 static class MasterServiceState { 1244 Connection connection; 1245 1246 MasterProtos.MasterService.BlockingInterface stub; 1247 int userCount; 1248 1249 MasterServiceState(final Connection connection) { 1250 super(); 1251 this.connection = connection; 1252 } 1253 1254 @Override 1255 public String toString() { 1256 return "MasterService"; 1257 } 1258 1259 Object getStub() { 1260 return this.stub; 1261 } 1262 1263 void clearStub() { 1264 this.stub = null; 1265 } 1266 1267 boolean isMasterRunning() throws IOException { 1268 MasterProtos.IsMasterRunningResponse response = null; 1269 try { 1270 response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1271 } catch (Exception e) { 1272 throw ProtobufUtil.handleRemoteException(e); 1273 } 1274 return response != null ? response.getIsMasterRunning() : false; 1275 } 1276 } 1277 1278 /** 1279 * The record of errors for servers. 1280 */ 1281 static class ServerErrorTracker { 1282 // We need a concurrent map here, as we could have multiple threads updating it in parallel. 1283 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = 1284 new ConcurrentHashMap<>(); 1285 private final long canRetryUntil; 1286 private final int maxTries;// max number to try 1287 private final long startTrackingTime; 1288 1289 /** 1290 * Constructor 1291 * @param timeout how long to wait before timeout, in unit of millisecond 1292 * @param maxTries how many times to try 1293 */ 1294 @SuppressWarnings("JavaUtilDate") 1295 public ServerErrorTracker(long timeout, int maxTries) { 1296 this.maxTries = maxTries; 1297 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; 1298 this.startTrackingTime = new Date().getTime(); 1299 } 1300 1301 /** 1302 * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. 1303 * @param numAttempt how many times we have tried by now 1304 */ 1305 boolean canTryMore(int numAttempt) { 1306 // If there is a single try we must not take into account the time. 1307 return numAttempt < maxTries 1308 || (maxTries > 1 && EnvironmentEdgeManager.currentTime() < this.canRetryUntil); 1309 } 1310 1311 /** 1312 * Calculates the back-off time for a retrying request to a particular server. 1313 * @param server The server in question. 1314 * @param basePause The default hci pause. 1315 * @return The time to wait before sending next request. 1316 */ 1317 long calculateBackoffTime(ServerName server, long basePause) { 1318 long result; 1319 ServerErrors errorStats = errorsByServer.get(server); 1320 if (errorStats != null) { 1321 result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); 1322 } else { 1323 result = 0; // yes, if the server is not in our list we don't wait before retrying. 1324 } 1325 return result; 1326 } 1327 1328 /** 1329 * Reports that there was an error on the server to do whatever bean-counting necessary. 1330 * @param server The server in question. 1331 */ 1332 void reportServerError(ServerName server) { 1333 computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); 1334 } 1335 1336 long getStartTrackingTime() { 1337 return startTrackingTime; 1338 } 1339 1340 /** 1341 * The record of errors for a server. 1342 */ 1343 private static class ServerErrors { 1344 private final AtomicInteger retries = new AtomicInteger(0); 1345 1346 public int getCount() { 1347 return retries.get(); 1348 } 1349 1350 public void addError() { 1351 retries.incrementAndGet(); 1352 } 1353 } 1354 } 1355 1356 /** 1357 * Class to make a MasterServiceStubMaker stub. 1358 */ 1359 private final class MasterServiceStubMaker { 1360 private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) 1361 throws IOException { 1362 try { 1363 stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1364 } catch (ServiceException e) { 1365 throw ProtobufUtil.handleRemoteException(e); 1366 } 1367 } 1368 1369 /** 1370 * Create a stub. Try once only. It is not typed because there is no common type to protobuf 1371 * services nor their interfaces. Let the caller do appropriate casting. 1372 * @return A stub for master services. 1373 */ 1374 private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() 1375 throws IOException, KeeperException { 1376 ServerName sn = get(registry.getActiveMaster()); 1377 if (sn == null) { 1378 String msg = "ZooKeeper available but no active master location found"; 1379 LOG.info(msg); 1380 throw new MasterNotRunningException(msg); 1381 } 1382 if (isDeadServer(sn)) { 1383 throw new MasterNotRunningException(sn + " is dead."); 1384 } 1385 // Use the security info interface name as our stub key 1386 String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn); 1387 MasterProtos.MasterService.BlockingInterface stub = 1388 (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1389 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); 1390 return MasterProtos.MasterService.newBlockingStub(channel); 1391 }); 1392 isMasterRunning(stub); 1393 return stub; 1394 } 1395 1396 /** 1397 * Create a stub against the master. Retry if necessary. 1398 * @return A stub to do <code>intf</code> against the master 1399 * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running 1400 */ 1401 MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { 1402 // The lock must be at the beginning to prevent multiple master creations 1403 // (and leaks) in a multithread context 1404 synchronized (masterLock) { 1405 Exception exceptionCaught = null; 1406 if (!closed) { 1407 try { 1408 return makeStubNoRetries(); 1409 } catch (IOException e) { 1410 exceptionCaught = e; 1411 } catch (KeeperException e) { 1412 exceptionCaught = e; 1413 } 1414 throw new MasterNotRunningException(exceptionCaught); 1415 } else { 1416 throw new DoNotRetryIOException("Connection was closed while trying to get master"); 1417 } 1418 } 1419 } 1420 } 1421 1422 @Override 1423 public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { 1424 return getAdmin(get(registry.getActiveMaster())); 1425 } 1426 1427 @Override 1428 public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) 1429 throws IOException { 1430 checkClosed(); 1431 if (isDeadServer(serverName)) { 1432 throw new RegionServerStoppedException(serverName + " is dead."); 1433 } 1434 String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName); 1435 return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1436 BlockingRpcChannel channel = 1437 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1438 return AdminProtos.AdminService.newBlockingStub(channel); 1439 }); 1440 } 1441 1442 @Override 1443 public BlockingInterface getClient(ServerName serverName) throws IOException { 1444 checkClosed(); 1445 if (isDeadServer(serverName)) { 1446 throw new RegionServerStoppedException(serverName + " is dead."); 1447 } 1448 String key = 1449 getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), serverName); 1450 return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1451 BlockingRpcChannel channel = 1452 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1453 return ClientProtos.ClientService.newBlockingStub(channel); 1454 }); 1455 } 1456 1457 final MasterServiceState masterServiceState = new MasterServiceState(this); 1458 1459 /** 1460 * Visible for tests 1461 */ 1462 MasterServiceState getMasterServiceState() { 1463 return this.masterServiceState; 1464 } 1465 1466 @Override 1467 public MasterKeepAliveConnection getMaster() throws IOException { 1468 return getKeepAliveMasterService(); 1469 } 1470 1471 private void resetMasterServiceState(final MasterServiceState mss) { 1472 mss.userCount++; 1473 } 1474 1475 private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { 1476 if (!isKeepAliveMasterConnectedAndRunning()) { 1477 synchronized (masterLock) { 1478 if (!isKeepAliveMasterConnectedAndRunning()) { 1479 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); 1480 this.masterServiceState.stub = stubMaker.makeStub(); 1481 } 1482 resetMasterServiceState(this.masterServiceState); 1483 } 1484 } 1485 1486 // Ugly delegation just so we can add in a Close method. 1487 final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; 1488 return new MasterKeepAliveConnection() { 1489 MasterServiceState mss = masterServiceState; 1490 1491 @Override 1492 public MasterProtos.AbortProcedureResponse abortProcedure(RpcController controller, 1493 MasterProtos.AbortProcedureRequest request) throws ServiceException { 1494 return stub.abortProcedure(controller, request); 1495 } 1496 1497 @Override 1498 public MasterProtos.GetProceduresResponse getProcedures(RpcController controller, 1499 MasterProtos.GetProceduresRequest request) throws ServiceException { 1500 return stub.getProcedures(controller, request); 1501 } 1502 1503 @Override 1504 public MasterProtos.GetLocksResponse getLocks(RpcController controller, 1505 MasterProtos.GetLocksRequest request) throws ServiceException { 1506 return stub.getLocks(controller, request); 1507 } 1508 1509 @Override 1510 public MasterProtos.AddColumnResponse addColumn(RpcController controller, 1511 MasterProtos.AddColumnRequest request) throws ServiceException { 1512 return stub.addColumn(controller, request); 1513 } 1514 1515 @Override 1516 public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller, 1517 MasterProtos.DeleteColumnRequest request) throws ServiceException { 1518 return stub.deleteColumn(controller, request); 1519 } 1520 1521 @Override 1522 public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller, 1523 MasterProtos.ModifyColumnRequest request) throws ServiceException { 1524 return stub.modifyColumn(controller, request); 1525 } 1526 1527 @Override 1528 public MasterProtos.MoveRegionResponse moveRegion(RpcController controller, 1529 MasterProtos.MoveRegionRequest request) throws ServiceException { 1530 return stub.moveRegion(controller, request); 1531 } 1532 1533 @Override 1534 public MasterProtos.MergeTableRegionsResponse mergeTableRegions(RpcController controller, 1535 MasterProtos.MergeTableRegionsRequest request) throws ServiceException { 1536 return stub.mergeTableRegions(controller, request); 1537 } 1538 1539 @Override 1540 public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, 1541 MasterProtos.AssignRegionRequest request) throws ServiceException { 1542 return stub.assignRegion(controller, request); 1543 } 1544 1545 @Override 1546 public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller, 1547 MasterProtos.UnassignRegionRequest request) throws ServiceException { 1548 return stub.unassignRegion(controller, request); 1549 } 1550 1551 @Override 1552 public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller, 1553 MasterProtos.OfflineRegionRequest request) throws ServiceException { 1554 return stub.offlineRegion(controller, request); 1555 } 1556 1557 @Override 1558 public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, 1559 MasterProtos.SplitTableRegionRequest request) throws ServiceException { 1560 return stub.splitRegion(controller, request); 1561 } 1562 1563 @Override 1564 public MasterProtos.TruncateRegionResponse truncateRegion(RpcController controller, 1565 MasterProtos.TruncateRegionRequest request) throws ServiceException { 1566 return stub.truncateRegion(controller, request); 1567 } 1568 1569 @Override 1570 public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, 1571 MasterProtos.DeleteTableRequest request) throws ServiceException { 1572 return stub.deleteTable(controller, request); 1573 } 1574 1575 @Override 1576 public MasterProtos.TruncateTableResponse truncateTable(RpcController controller, 1577 MasterProtos.TruncateTableRequest request) throws ServiceException { 1578 return stub.truncateTable(controller, request); 1579 } 1580 1581 @Override 1582 public MasterProtos.EnableTableResponse enableTable(RpcController controller, 1583 MasterProtos.EnableTableRequest request) throws ServiceException { 1584 return stub.enableTable(controller, request); 1585 } 1586 1587 @Override 1588 public MasterProtos.DisableTableResponse disableTable(RpcController controller, 1589 MasterProtos.DisableTableRequest request) throws ServiceException { 1590 return stub.disableTable(controller, request); 1591 } 1592 1593 @Override 1594 public MasterProtos.ModifyTableResponse modifyTable(RpcController controller, 1595 MasterProtos.ModifyTableRequest request) throws ServiceException { 1596 return stub.modifyTable(controller, request); 1597 } 1598 1599 @Override 1600 public MasterProtos.CreateTableResponse createTable(RpcController controller, 1601 MasterProtos.CreateTableRequest request) throws ServiceException { 1602 return stub.createTable(controller, request); 1603 } 1604 1605 @Override 1606 public MasterProtos.ShutdownResponse shutdown(RpcController controller, 1607 MasterProtos.ShutdownRequest request) throws ServiceException { 1608 return stub.shutdown(controller, request); 1609 } 1610 1611 @Override 1612 public MasterProtos.StopMasterResponse stopMaster(RpcController controller, 1613 MasterProtos.StopMasterRequest request) throws ServiceException { 1614 return stub.stopMaster(controller, request); 1615 } 1616 1617 @Override 1618 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode( 1619 final RpcController controller, final MasterProtos.IsInMaintenanceModeRequest request) 1620 throws ServiceException { 1621 return stub.isMasterInMaintenanceMode(controller, request); 1622 } 1623 1624 @Override 1625 public MasterProtos.BalanceResponse balance(RpcController controller, 1626 MasterProtos.BalanceRequest request) throws ServiceException { 1627 return stub.balance(controller, request); 1628 } 1629 1630 @Override 1631 public MasterProtos.SetBalancerRunningResponse setBalancerRunning(RpcController controller, 1632 MasterProtos.SetBalancerRunningRequest request) throws ServiceException { 1633 return stub.setBalancerRunning(controller, request); 1634 } 1635 1636 @Override 1637 public NormalizeResponse normalize(RpcController controller, NormalizeRequest request) 1638 throws ServiceException { 1639 return stub.normalize(controller, request); 1640 } 1641 1642 @Override 1643 public SetNormalizerRunningResponse setNormalizerRunning(RpcController controller, 1644 SetNormalizerRunningRequest request) throws ServiceException { 1645 return stub.setNormalizerRunning(controller, request); 1646 } 1647 1648 @Override 1649 public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller, 1650 MasterProtos.RunCatalogScanRequest request) throws ServiceException { 1651 return stub.runCatalogScan(controller, request); 1652 } 1653 1654 @Override 1655 public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor( 1656 RpcController controller, MasterProtos.EnableCatalogJanitorRequest request) 1657 throws ServiceException { 1658 return stub.enableCatalogJanitor(controller, request); 1659 } 1660 1661 @Override 1662 public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( 1663 RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request) 1664 throws ServiceException { 1665 return stub.isCatalogJanitorEnabled(controller, request); 1666 } 1667 1668 @Override 1669 public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller, 1670 MasterProtos.RunCleanerChoreRequest request) throws ServiceException { 1671 return stub.runCleanerChore(controller, request); 1672 } 1673 1674 @Override 1675 public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning( 1676 RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request) 1677 throws ServiceException { 1678 return stub.setCleanerChoreRunning(controller, request); 1679 } 1680 1681 @Override 1682 public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled( 1683 RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request) 1684 throws ServiceException { 1685 return stub.isCleanerChoreEnabled(controller, request); 1686 } 1687 1688 @Override 1689 public ClientProtos.CoprocessorServiceResponse execMasterService(RpcController controller, 1690 ClientProtos.CoprocessorServiceRequest request) throws ServiceException { 1691 return stub.execMasterService(controller, request); 1692 } 1693 1694 @Override 1695 public MasterProtos.SnapshotResponse snapshot(RpcController controller, 1696 MasterProtos.SnapshotRequest request) throws ServiceException { 1697 return stub.snapshot(controller, request); 1698 } 1699 1700 @Override 1701 public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots( 1702 RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request) 1703 throws ServiceException { 1704 return stub.getCompletedSnapshots(controller, request); 1705 } 1706 1707 @Override 1708 public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller, 1709 MasterProtos.DeleteSnapshotRequest request) throws ServiceException { 1710 return stub.deleteSnapshot(controller, request); 1711 } 1712 1713 @Override 1714 public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller, 1715 MasterProtos.IsSnapshotDoneRequest request) throws ServiceException { 1716 return stub.isSnapshotDone(controller, request); 1717 } 1718 1719 @Override 1720 public MasterProtos.RestoreSnapshotResponse restoreSnapshot(RpcController controller, 1721 MasterProtos.RestoreSnapshotRequest request) throws ServiceException { 1722 return stub.restoreSnapshot(controller, request); 1723 } 1724 1725 @Override 1726 public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup(RpcController controller, 1727 MasterProtos.SetSnapshotCleanupRequest request) throws ServiceException { 1728 return stub.switchSnapshotCleanup(controller, request); 1729 } 1730 1731 @Override 1732 public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled( 1733 RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request) 1734 throws ServiceException { 1735 return stub.isSnapshotCleanupEnabled(controller, request); 1736 } 1737 1738 @Override 1739 public MasterProtos.ExecProcedureResponse execProcedure(RpcController controller, 1740 MasterProtos.ExecProcedureRequest request) throws ServiceException { 1741 return stub.execProcedure(controller, request); 1742 } 1743 1744 @Override 1745 public MasterProtos.ExecProcedureResponse execProcedureWithRet(RpcController controller, 1746 MasterProtos.ExecProcedureRequest request) throws ServiceException { 1747 return stub.execProcedureWithRet(controller, request); 1748 } 1749 1750 @Override 1751 public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller, 1752 MasterProtos.IsProcedureDoneRequest request) throws ServiceException { 1753 return stub.isProcedureDone(controller, request); 1754 } 1755 1756 @Override 1757 public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, 1758 MasterProtos.GetProcedureResultRequest request) throws ServiceException { 1759 return stub.getProcedureResult(controller, request); 1760 } 1761 1762 @Override 1763 public MasterProtos.IsMasterRunningResponse isMasterRunning(RpcController controller, 1764 MasterProtos.IsMasterRunningRequest request) throws ServiceException { 1765 return stub.isMasterRunning(controller, request); 1766 } 1767 1768 @Override 1769 public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller, 1770 MasterProtos.ModifyNamespaceRequest request) throws ServiceException { 1771 return stub.modifyNamespace(controller, request); 1772 } 1773 1774 @Override 1775 public MasterProtos.CreateNamespaceResponse createNamespace(RpcController controller, 1776 MasterProtos.CreateNamespaceRequest request) throws ServiceException { 1777 return stub.createNamespace(controller, request); 1778 } 1779 1780 @Override 1781 public MasterProtos.DeleteNamespaceResponse deleteNamespace(RpcController controller, 1782 MasterProtos.DeleteNamespaceRequest request) throws ServiceException { 1783 return stub.deleteNamespace(controller, request); 1784 } 1785 1786 @Override 1787 public MasterProtos.ListNamespacesResponse listNamespaces(RpcController controller, 1788 MasterProtos.ListNamespacesRequest request) throws ServiceException { 1789 return stub.listNamespaces(controller, request); 1790 } 1791 1792 @Override 1793 public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor( 1794 RpcController controller, MasterProtos.GetNamespaceDescriptorRequest request) 1795 throws ServiceException { 1796 return stub.getNamespaceDescriptor(controller, request); 1797 } 1798 1799 @Override 1800 public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors( 1801 RpcController controller, MasterProtos.ListNamespaceDescriptorsRequest request) 1802 throws ServiceException { 1803 return stub.listNamespaceDescriptors(controller, request); 1804 } 1805 1806 @Override 1807 public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace( 1808 RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request) 1809 throws ServiceException { 1810 return stub.listTableDescriptorsByNamespace(controller, request); 1811 } 1812 1813 @Override 1814 public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace( 1815 RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request) 1816 throws ServiceException { 1817 return stub.listTableNamesByNamespace(controller, request); 1818 } 1819 1820 @Override 1821 public MasterProtos.GetTableStateResponse getTableState(RpcController controller, 1822 MasterProtos.GetTableStateRequest request) throws ServiceException { 1823 return stub.getTableState(controller, request); 1824 } 1825 1826 @Override 1827 public void close() { 1828 release(this.mss); 1829 } 1830 1831 @Override 1832 public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus( 1833 RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request) 1834 throws ServiceException { 1835 return stub.getSchemaAlterStatus(controller, request); 1836 } 1837 1838 @Override 1839 public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(RpcController controller, 1840 MasterProtos.GetTableDescriptorsRequest request) throws ServiceException { 1841 return stub.getTableDescriptors(controller, request); 1842 } 1843 1844 @Override 1845 public MasterProtos.ListTableDescriptorsByStateResponse listTableDescriptorsByState( 1846 RpcController controller, MasterProtos.ListTableDescriptorsByStateRequest request) 1847 throws ServiceException { 1848 return stub.listTableDescriptorsByState(controller, request); 1849 } 1850 1851 @Override 1852 public MasterProtos.GetTableNamesResponse getTableNames(RpcController controller, 1853 MasterProtos.GetTableNamesRequest request) throws ServiceException { 1854 return stub.getTableNames(controller, request); 1855 } 1856 1857 @Override 1858 public MasterProtos.FlushTableResponse flushTable(RpcController controller, 1859 MasterProtos.FlushTableRequest request) throws ServiceException { 1860 return stub.flushTable(controller, request); 1861 } 1862 1863 @Override 1864 public MasterProtos.ListTableNamesByStateResponse listTableNamesByState( 1865 RpcController controller, MasterProtos.ListTableNamesByStateRequest request) 1866 throws ServiceException { 1867 return stub.listTableNamesByState(controller, request); 1868 } 1869 1870 @Override 1871 public MasterProtos.GetClusterStatusResponse getClusterStatus(RpcController controller, 1872 MasterProtos.GetClusterStatusRequest request) throws ServiceException { 1873 return stub.getClusterStatus(controller, request); 1874 } 1875 1876 @Override 1877 public MasterProtos.SetQuotaResponse setQuota(RpcController controller, 1878 MasterProtos.SetQuotaRequest request) throws ServiceException { 1879 return stub.setQuota(controller, request); 1880 } 1881 1882 @Override 1883 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( 1884 RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) 1885 throws ServiceException { 1886 return stub.getLastMajorCompactionTimestamp(controller, request); 1887 } 1888 1889 @Override 1890 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( 1891 RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request) 1892 throws ServiceException { 1893 return stub.getLastMajorCompactionTimestampForRegion(controller, request); 1894 } 1895 1896 @Override 1897 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, 1898 IsBalancerEnabledRequest request) throws ServiceException { 1899 return stub.isBalancerEnabled(controller, request); 1900 } 1901 1902 @Override 1903 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled( 1904 RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request) 1905 throws ServiceException { 1906 return stub.setSplitOrMergeEnabled(controller, request); 1907 } 1908 1909 @Override 1910 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled( 1911 RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request) 1912 throws ServiceException { 1913 return stub.isSplitOrMergeEnabled(controller, request); 1914 } 1915 1916 @Override 1917 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, 1918 IsNormalizerEnabledRequest request) throws ServiceException { 1919 return stub.isNormalizerEnabled(controller, request); 1920 } 1921 1922 @Override 1923 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller, 1924 SecurityCapabilitiesRequest request) throws ServiceException { 1925 return stub.getSecurityCapabilities(controller, request); 1926 } 1927 1928 @Override 1929 public AddReplicationPeerResponse addReplicationPeer(RpcController controller, 1930 AddReplicationPeerRequest request) throws ServiceException { 1931 return stub.addReplicationPeer(controller, request); 1932 } 1933 1934 @Override 1935 public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller, 1936 RemoveReplicationPeerRequest request) throws ServiceException { 1937 return stub.removeReplicationPeer(controller, request); 1938 } 1939 1940 @Override 1941 public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller, 1942 EnableReplicationPeerRequest request) throws ServiceException { 1943 return stub.enableReplicationPeer(controller, request); 1944 } 1945 1946 @Override 1947 public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller, 1948 DisableReplicationPeerRequest request) throws ServiceException { 1949 return stub.disableReplicationPeer(controller, request); 1950 } 1951 1952 @Override 1953 public ListDecommissionedRegionServersResponse listDecommissionedRegionServers( 1954 RpcController controller, ListDecommissionedRegionServersRequest request) 1955 throws ServiceException { 1956 return stub.listDecommissionedRegionServers(controller, request); 1957 } 1958 1959 @Override 1960 public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller, 1961 DecommissionRegionServersRequest request) throws ServiceException { 1962 return stub.decommissionRegionServers(controller, request); 1963 } 1964 1965 @Override 1966 public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller, 1967 RecommissionRegionServerRequest request) throws ServiceException { 1968 return stub.recommissionRegionServer(controller, request); 1969 } 1970 1971 @Override 1972 public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller, 1973 GetReplicationPeerConfigRequest request) throws ServiceException { 1974 return stub.getReplicationPeerConfig(controller, request); 1975 } 1976 1977 @Override 1978 public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig( 1979 RpcController controller, UpdateReplicationPeerConfigRequest request) 1980 throws ServiceException { 1981 return stub.updateReplicationPeerConfig(controller, request); 1982 } 1983 1984 @Override 1985 public ListReplicationPeersResponse listReplicationPeers(RpcController controller, 1986 ListReplicationPeersRequest request) throws ServiceException { 1987 return stub.listReplicationPeers(controller, request); 1988 } 1989 1990 @Override 1991 public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller, 1992 GetReplicationPeerStateRequest request) throws ServiceException { 1993 return stub.isReplicationPeerEnabled(controller, request); 1994 } 1995 1996 @Override 1997 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller, 1998 GetSpaceQuotaRegionSizesRequest request) throws ServiceException { 1999 return stub.getSpaceQuotaRegionSizes(controller, request); 2000 } 2001 2002 @Override 2003 public GetQuotaStatesResponse getQuotaStates(RpcController controller, 2004 GetQuotaStatesRequest request) throws ServiceException { 2005 return stub.getQuotaStates(controller, request); 2006 } 2007 2008 @Override 2009 public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller, 2010 MasterProtos.ClearDeadServersRequest request) throws ServiceException { 2011 return stub.clearDeadServers(controller, request); 2012 } 2013 2014 @Override 2015 public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, 2016 SwitchRpcThrottleRequest request) throws ServiceException { 2017 return stub.switchRpcThrottle(controller, request); 2018 } 2019 2020 @Override 2021 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, 2022 IsRpcThrottleEnabledRequest request) throws ServiceException { 2023 return stub.isRpcThrottleEnabled(controller, request); 2024 } 2025 2026 @Override 2027 public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller, 2028 SwitchExceedThrottleQuotaRequest request) throws ServiceException { 2029 return stub.switchExceedThrottleQuota(controller, request); 2030 } 2031 2032 @Override 2033 public AccessControlProtos.GrantResponse grant(RpcController controller, 2034 AccessControlProtos.GrantRequest request) throws ServiceException { 2035 return stub.grant(controller, request); 2036 } 2037 2038 @Override 2039 public AccessControlProtos.RevokeResponse revoke(RpcController controller, 2040 AccessControlProtos.RevokeRequest request) throws ServiceException { 2041 return stub.revoke(controller, request); 2042 } 2043 2044 @Override 2045 public GetUserPermissionsResponse getUserPermissions(RpcController controller, 2046 GetUserPermissionsRequest request) throws ServiceException { 2047 return stub.getUserPermissions(controller, request); 2048 } 2049 2050 @Override 2051 public HasUserPermissionsResponse hasUserPermissions(RpcController controller, 2052 HasUserPermissionsRequest request) throws ServiceException { 2053 return stub.hasUserPermissions(controller, request); 2054 } 2055 2056 @Override 2057 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 2058 HBaseProtos.LogRequest request) throws ServiceException { 2059 return stub.getLogEntries(controller, request); 2060 } 2061 2062 @Override 2063 public ModifyTableStoreFileTrackerResponse modifyTableStoreFileTracker( 2064 RpcController controller, ModifyTableStoreFileTrackerRequest request) 2065 throws ServiceException { 2066 return stub.modifyTableStoreFileTracker(controller, request); 2067 } 2068 2069 @Override 2070 public ModifyColumnStoreFileTrackerResponse modifyColumnStoreFileTracker( 2071 RpcController controller, ModifyColumnStoreFileTrackerRequest request) 2072 throws ServiceException { 2073 return stub.modifyColumnStoreFileTracker(controller, request); 2074 } 2075 2076 @Override 2077 public FlushMasterStoreResponse flushMasterStore(RpcController controller, 2078 FlushMasterStoreRequest request) throws ServiceException { 2079 return stub.flushMasterStore(controller, request); 2080 } 2081 2082 @Override 2083 public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch( 2084 RpcController controller, ReplicationPeerModificationSwitchRequest request) 2085 throws ServiceException { 2086 return stub.replicationPeerModificationSwitch(controller, request); 2087 } 2088 2089 @Override 2090 public GetReplicationPeerModificationProceduresResponse 2091 getReplicationPeerModificationProcedures(RpcController controller, 2092 GetReplicationPeerModificationProceduresRequest request) throws ServiceException { 2093 return stub.getReplicationPeerModificationProcedures(controller, request); 2094 } 2095 2096 @Override 2097 public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled( 2098 RpcController controller, IsReplicationPeerModificationEnabledRequest request) 2099 throws ServiceException { 2100 return stub.isReplicationPeerModificationEnabled(controller, request); 2101 } 2102 }; 2103 } 2104 2105 private static void release(MasterServiceState mss) { 2106 if (mss != null && mss.connection != null) { 2107 ((ConnectionImplementation) mss.connection).releaseMaster(mss); 2108 } 2109 } 2110 2111 private boolean isKeepAliveMasterConnectedAndRunning() { 2112 LOG.trace("Getting master connection state from TTL Cache"); 2113 return masterStateSupplier.get(); 2114 } 2115 2116 void releaseMaster(MasterServiceState mss) { 2117 if (mss.getStub() == null) { 2118 return; 2119 } 2120 synchronized (masterLock) { 2121 --mss.userCount; 2122 } 2123 } 2124 2125 private void closeMasterService(MasterServiceState mss) { 2126 if (mss.getStub() != null) { 2127 LOG.info("Closing master protocol: " + mss); 2128 mss.clearStub(); 2129 } 2130 mss.userCount = 0; 2131 } 2132 2133 /** 2134 * Immediate close of the shared master. Can be by the delayed close or when closing the 2135 * connection itself. 2136 */ 2137 private void closeMaster() { 2138 synchronized (masterLock) { 2139 closeMasterService(masterServiceState); 2140 } 2141 } 2142 2143 void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) { 2144 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); 2145 cacheLocation(hri.getTable(), source, newHrl); 2146 } 2147 2148 /** 2149 * Update the location with the new value (if the exception is a RegionMovedException) or delete 2150 * it from the cache. Does nothing if we can be sure from the exception that the location is still 2151 * accurate, or if the cache has already been updated. 2152 * @param exception an object (to simplify user code) on which we will try to find a nested or 2153 * wrapped or both RegionMovedException 2154 * @param source server that is the source of the location update. 2155 */ 2156 @Override 2157 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, 2158 final Object exception, final ServerName source) { 2159 if (rowkey == null || tableName == null) { 2160 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) 2161 + ", tableName=" + (tableName == null ? "null" : tableName)); 2162 return; 2163 } 2164 2165 if (source == null) { 2166 // This should not happen, but let's secure ourselves. 2167 return; 2168 } 2169 2170 if (regionName == null) { 2171 // we do not know which region, so just remove the cache entry for the row and server 2172 if (metrics != null) { 2173 metrics.incrCacheDroppingExceptions(exception); 2174 } 2175 metaCache.clearCache(tableName, rowkey, source); 2176 return; 2177 } 2178 2179 // Is it something we have already updated? 2180 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey); 2181 HRegionLocation oldLocation = null; 2182 if (oldLocations != null) { 2183 oldLocation = oldLocations.getRegionLocationByRegionName(regionName); 2184 } 2185 if (oldLocation == null || !source.equals(oldLocation.getServerName())) { 2186 // There is no such location in the cache (it's been removed already) or 2187 // the cache has already been refreshed with a different location. => nothing to do 2188 return; 2189 } 2190 2191 RegionInfo regionInfo = oldLocation.getRegion(); 2192 Throwable cause = ClientExceptionsUtil.findException(exception); 2193 if (cause != null) { 2194 if (!ClientExceptionsUtil.isMetaClearingException(cause)) { 2195 // We know that the region is still on this region server 2196 return; 2197 } 2198 2199 if (cause instanceof RegionMovedException) { 2200 RegionMovedException rme = (RegionMovedException) cause; 2201 if (LOG.isTraceEnabled()) { 2202 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " 2203 + rme.getHostname() + ":" + rme.getPort() + " according to " + source.getAddress()); 2204 } 2205 // We know that the region is not anymore on this region server, but we know 2206 // the new location. 2207 updateCachedLocation(regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); 2208 return; 2209 } 2210 } 2211 2212 if (metrics != null) { 2213 metrics.incrCacheDroppingExceptions(exception); 2214 } 2215 2216 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 2217 // with timestamp internally. Next time the client looks up the same location, 2218 // it will pick a different meta replica region. 2219 if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 2220 metaReplicaSelector.onError(oldLocation); 2221 } 2222 2223 // If we're here, it means that can cannot be sure about the location, so we remove it from 2224 // the cache. Do not send the source because source can be a new server in the same host:port 2225 metaCache.clearCache(regionInfo); 2226 } 2227 2228 @Override 2229 public AsyncProcess getAsyncProcess() { 2230 return asyncProcess; 2231 } 2232 2233 @Override 2234 public ServerStatisticTracker getStatisticsTracker() { 2235 return this.stats; 2236 } 2237 2238 @Override 2239 public ClientBackoffPolicy getBackoffPolicy() { 2240 return this.backoffPolicy; 2241 } 2242 2243 /* 2244 * Return the number of cached region for a table. It will only be called from a unit test. 2245 */ 2246 int getNumberOfCachedRegionLocations(final TableName tableName) { 2247 return metaCache.getNumberOfCachedRegionLocations(tableName); 2248 } 2249 2250 @Override 2251 public void abort(final String msg, Throwable t) { 2252 if (t != null) { 2253 LOG.error(HBaseMarkers.FATAL, msg, t); 2254 } else { 2255 LOG.error(HBaseMarkers.FATAL, msg); 2256 } 2257 this.aborted = true; 2258 close(); 2259 this.closed = true; 2260 } 2261 2262 @Override 2263 public boolean isClosed() { 2264 return this.closed; 2265 } 2266 2267 @Override 2268 public boolean isAborted() { 2269 return this.aborted; 2270 } 2271 2272 @Override 2273 public void close() { 2274 TraceUtil.trace(() -> { 2275 if (this.closed) { 2276 return; 2277 } 2278 closeMaster(); 2279 shutdownPools(); 2280 if (this.metrics != null) { 2281 MetricsConnection.deleteMetricsConnection(metricsScope); 2282 } 2283 this.closed = true; 2284 if (this.registry != null) { 2285 registry.close(); 2286 } 2287 this.stubs.clear(); 2288 if (clusterStatusListener != null) { 2289 clusterStatusListener.close(); 2290 } 2291 if (rpcClient != null) { 2292 rpcClient.close(); 2293 } 2294 synchronized (this) { 2295 if (choreService != null) { 2296 choreService.shutdown(); 2297 } 2298 } 2299 }, this.getClass().getSimpleName() + ".close"); 2300 } 2301 2302 /** 2303 * Close the connection for good. On the off chance that someone is unable to close the 2304 * connection, perhaps because it bailed out prematurely, the method below will ensure that this 2305 * instance is cleaned up. Caveat: The JVM may take an unknown amount of time to call finalize on 2306 * an unreachable object, so our hope is that every consumer cleans up after itself, like any good 2307 * citizen. 2308 */ 2309 @Override 2310 protected void finalize() throws Throwable { 2311 super.finalize(); 2312 close(); 2313 } 2314 2315 @Override 2316 public NonceGenerator getNonceGenerator() { 2317 return nonceGenerator; 2318 } 2319 2320 @Override 2321 public TableState getTableState(TableName tableName) throws IOException { 2322 checkClosed(); 2323 TableState tableState = MetaTableAccessor.getTableState(this, tableName); 2324 if (tableState == null) { 2325 throw new TableNotFoundException(tableName); 2326 } 2327 return tableState; 2328 } 2329 2330 @Override 2331 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { 2332 return RpcRetryingCallerFactory.instantiate(conf, connectionConfig, this.interceptor, 2333 this.getStatisticsTracker(), metrics); 2334 } 2335 2336 @Override 2337 public boolean hasCellBlockSupport() { 2338 return this.rpcClient.hasCellBlockSupport(); 2339 } 2340 2341 @Override 2342 public ConnectionConfiguration getConnectionConfiguration() { 2343 return this.connectionConfig; 2344 } 2345 2346 @Override 2347 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { 2348 return this.rpcCallerFactory; 2349 } 2350 2351 @Override 2352 public RpcControllerFactory getRpcControllerFactory() { 2353 return this.rpcControllerFactory; 2354 } 2355 2356 private static <T> T get(CompletableFuture<T> future) throws IOException { 2357 try { 2358 return future.get(); 2359 } catch (InterruptedException e) { 2360 Thread.currentThread().interrupt(); 2361 throw (IOException) new InterruptedIOException().initCause(e); 2362 } catch (ExecutionException e) { 2363 Throwable cause = e.getCause(); 2364 Throwables.propagateIfPossible(cause, IOException.class); 2365 throw new IOException(cause); 2366 } 2367 } 2368 2369 @Override 2370 public List<ServerName> getLiveRegionServers(Supplier<ServerName> masterAddrTracker, int count) 2371 throws IOException { 2372 RegionServerStatusService.BlockingInterface stub = RegionServerStatusService.newBlockingStub( 2373 rpcClient.createBlockingRpcChannel(masterAddrTracker.get(), user, rpcTimeout)); 2374 GetLiveRegionServersResponse resp; 2375 try { 2376 resp = stub.getLiveRegionServers(null, 2377 GetLiveRegionServersRequest.newBuilder().setCount(count).build()); 2378 } catch (ServiceException e) { 2379 Throwable t = ConnectionUtils.translateException(e); 2380 Throwables.propagateIfPossible(t, IOException.class); 2381 throw new IOException(t); 2382 } 2383 return resp.getServerList().stream().map(ProtobufUtil::toServerName) 2384 .collect(Collectors.toList()); 2385 } 2386 2387 @Override 2388 public List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException { 2389 BootstrapNodeService.BlockingInterface stub = BootstrapNodeService 2390 .newBlockingStub(rpcClient.createBlockingRpcChannel(regionServer, user, rpcTimeout)); 2391 GetAllBootstrapNodesResponse resp; 2392 try { 2393 resp = stub.getAllBootstrapNodes(null, GetAllBootstrapNodesRequest.getDefaultInstance()); 2394 } catch (ServiceException e) { 2395 Throwable t = ConnectionUtils.translateException(e); 2396 Throwables.propagateIfPossible(t, IOException.class); 2397 throw new IOException(t); 2398 } 2399 return resp.getNodeList().stream().map(ProtobufUtil::toServerName).collect(Collectors.toList()); 2400 } 2401 2402 @Override 2403 public String getClusterId() { 2404 try { 2405 return registry.getClusterId().get(); 2406 } catch (InterruptedException | ExecutionException e) { 2407 LOG.error("Error fetching cluster ID: ", e); 2408 } 2409 return null; 2410 } 2411}