001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.net.SocketAddress;
022import java.util.Collections;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicReference;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.client.MetricsConnection;
029import org.apache.hadoop.hbase.exceptions.X509Exception;
030import org.apache.hadoop.hbase.io.FileChangeWatcher;
031import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
032import org.apache.hadoop.hbase.util.NettyFutureUtils;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.io.netty.channel.Channel;
037import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
038import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
039import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
040import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
041import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
042
043/**
044 * Netty client for the requests and responses.
045 * @since 2.0.0
046 */
047@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
048public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
049
050  final EventLoopGroup group;
051
052  final Class<? extends Channel> channelClass;
053
054  private final boolean shutdownGroupWhenClose;
055  private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
056  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
057  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
058
059  public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
060    MetricsConnection metrics) {
061    this(configuration, clusterId, localAddress, metrics, Collections.emptyMap());
062  }
063
064  public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
065    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
066    super(configuration, clusterId, localAddress, metrics, connectionAttributes);
067    Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass =
068      NettyRpcClientConfigHelper.getEventLoopConfig(conf);
069    if (groupAndChannelClass == null) {
070      // Use our own EventLoopGroup.
071      int threadCount =
072        conf.getInt(NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0);
073      this.group = new NioEventLoopGroup(threadCount,
074        new DefaultThreadFactory("RPCClient(own)-NioEventLoopGroup", true, Thread.NORM_PRIORITY));
075      this.channelClass = NioSocketChannel.class;
076      this.shutdownGroupWhenClose = true;
077    } else {
078      this.group = groupAndChannelClass.getFirst();
079      this.channelClass = groupAndChannelClass.getSecond();
080      this.shutdownGroupWhenClose = false;
081    }
082  }
083
084  /** Used in test only. */
085  public NettyRpcClient(Configuration configuration) {
086    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, Collections.emptyMap());
087  }
088
089  @Override
090  protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException {
091    return new NettyRpcConnection(this, remoteId);
092  }
093
094  @Override
095  protected void closeInternal() {
096    if (shutdownGroupWhenClose) {
097      NettyFutureUtils.consume(group.shutdownGracefully());
098    }
099    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
100    if (ks != null) {
101      ks.stop();
102    }
103    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
104    if (ts != null) {
105      ts.stop();
106    }
107  }
108
109  SslContext getSslContext() throws X509Exception, IOException {
110    SslContext result = sslContextForClient.get();
111    if (result == null) {
112      result = X509Util.createSslContextForClient(conf);
113      if (!sslContextForClient.compareAndSet(null, result)) {
114        // lost the race, another thread already set the value
115        result = sslContextForClient.get();
116      } else if (
117        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
118          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
119      ) {
120        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
121          () -> sslContextForClient.set(null));
122      }
123    }
124    return result;
125  }
126}