001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.CompletableFuture; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.ipc.HBaseRpcController; 028import org.apache.hadoop.hbase.ipc.RpcClient; 029import org.apache.hadoop.hbase.ipc.RpcClientFactory; 030import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 031import org.apache.hadoop.hbase.security.User; 032import org.apache.hadoop.hbase.util.FutureUtils; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 038import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 043 044/** 045 * Fetch cluster id through special preamble header. 046 * <p> 047 * An instance of this class should only be used once, like: 048 * 049 * <pre> 050 * new ClusterIdFetcher().fetchClusterId() 051 * </pre> 052 * 053 * Calling the fetchClusterId multiple times will lead unexpected behavior. 054 * <p> 055 * See HBASE-25051 for more details. 056 */ 057@InterfaceAudience.Private 058class ClusterIdFetcher { 059 060 private static final Logger LOG = LoggerFactory.getLogger(ClusterIdFetcher.class); 061 062 private final List<ServerName> bootstrapServers; 063 064 private final User user; 065 066 private final RpcClient rpcClient; 067 068 private final RpcControllerFactory rpcControllerFactory; 069 070 private final CompletableFuture<String> future; 071 072 ClusterIdFetcher(Configuration conf, User user, RpcControllerFactory rpcControllerFactory, 073 Set<ServerName> bootstrapServers) { 074 this.user = user; 075 // use null cluster id here as we do not know the cluster id yet, we will fetch it through this 076 // rpc client 077 this.rpcClient = RpcClientFactory.createClient(conf, null); 078 this.rpcControllerFactory = rpcControllerFactory; 079 this.bootstrapServers = new ArrayList<ServerName>(bootstrapServers); 080 // shuffle the bootstrap servers so we will not always fetch from the same one 081 Collections.shuffle(this.bootstrapServers); 082 future = new CompletableFuture<String>(); 083 } 084 085 /** 086 * Try get cluster id from the server with the given {@code index} in {@link #bootstrapServers}. 087 */ 088 private void getClusterId(int index) { 089 ServerName server = bootstrapServers.get(index); 090 LOG.debug("Going to request {} for getting cluster id", server); 091 // user and rpcTimeout are both not important here, as we will not actually send any rpc calls 092 // out, only a preamble connection header, but if we pass null as user, there will be NPE in 093 // some code paths... 094 RpcChannel channel = rpcClient.createRpcChannel(server, user, 0); 095 ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel); 096 HBaseRpcController controller = rpcControllerFactory.newController(); 097 stub.getConnectionRegistry(controller, GetConnectionRegistryRequest.getDefaultInstance(), 098 new RpcCallback<GetConnectionRegistryResponse>() { 099 100 @Override 101 public void run(GetConnectionRegistryResponse resp) { 102 if (!controller.failed()) { 103 LOG.debug("Got connection registry info: {}", resp); 104 future.complete(resp.getClusterId()); 105 return; 106 } 107 if (ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) { 108 // this means we have connected to an old server where it does not support passing 109 // cluster id through preamble connnection header, so we fallback to use null 110 // cluster id, which is the old behavior 111 LOG.debug("Failed to get connection registry info, should be an old server," 112 + " fallback to use null cluster id", controller.getFailed()); 113 future.complete(null); 114 } else { 115 LOG.debug("Failed to get connection registry info", controller.getFailed()); 116 if (index == bootstrapServers.size() - 1) { 117 future.completeExceptionally(controller.getFailed()); 118 } else { 119 // try next bootstrap server 120 getClusterId(index + 1); 121 } 122 } 123 } 124 }); 125 126 } 127 128 CompletableFuture<String> fetchClusterId() { 129 getClusterId(0); 130 // close the rpc client after we finish the request 131 FutureUtils.addListener(future, (r, e) -> rpcClient.close()); 132 return future; 133 } 134}