001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE;
021import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_ENABLED;
022import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT;
023import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.HBASE_SERVER_NETTY_TLS_WRAP_SIZE;
024import static org.apache.hadoop.hbase.io.crypto.tls.X509Util.TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED;
025
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.net.InetSocketAddress;
029import java.net.SocketAddress;
030import java.security.cert.Certificate;
031import java.security.cert.X509Certificate;
032import java.util.List;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.atomic.AtomicReference;
035import javax.net.ssl.SSLPeerUnverifiedException;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.CellScanner;
038import org.apache.hadoop.hbase.HBaseInterfaceAudience;
039import org.apache.hadoop.hbase.Server;
040import org.apache.hadoop.hbase.exceptions.X509Exception;
041import org.apache.hadoop.hbase.io.FileChangeWatcher;
042import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
043import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
044import org.apache.hadoop.hbase.regionserver.HRegionServer;
045import org.apache.hadoop.hbase.security.HBasePolicyProvider;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
048import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.hadoop.hbase.util.ReflectionUtils;
051import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
058import org.apache.hbase.thirdparty.com.google.protobuf.Message;
059import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
060import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
061import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
062import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
063import org.apache.hbase.thirdparty.io.netty.channel.Channel;
064import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
065import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
066import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
067import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
068import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
069import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
070import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
071import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
072import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler;
073import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
074import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
075import org.apache.hbase.thirdparty.io.netty.util.concurrent.GlobalEventExecutor;
076
077/**
078 * An RPC server with Netty4 implementation.
079 * @since 2.0.0
080 */
081@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
082public class NettyRpcServer extends RpcServer {
083  public static final Logger LOG = LoggerFactory.getLogger(NettyRpcServer.class);
084
085  /**
086   * Name of property to change the byte buf allocator for the netty channels. Default is no value,
087   * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
088   * and "heap", or, the name of a class implementing ByteBufAllocator.
089   * <p>
090   * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
091   * controlled by platform specific code and documented system properties.
092   * <p>
093   * "heap" will prefer heap arena allocations.
094   */
095  public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
096  static final String POOLED_ALLOCATOR_TYPE = "pooled";
097  static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
098  static final String HEAP_ALLOCATOR_TYPE = "heap";
099
100  /**
101   * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was
102   * exceeded, channel will have setAutoRead to true again. The server will start reading incoming
103   * bytes (requests) from the client channel.
104   */
105  public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
106    "hbase.server.netty.writable.watermark.low";
107  private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;
108
109  /**
110   * High watermark for pending outbound bytes of a single netty channel. If the number of pending
111   * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server
112   * will stop reading incoming requests from the client channel.
113   * <p>
114   * Note: any requests already in the call queue will still be processed.
115   */
116  public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
117    "hbase.server.netty.writable.watermark.high";
118  private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;
119
120  /**
121   * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending
122   * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory
123   * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight
124   * requests.
125   * <p>
126   * Note: must be higher than the high watermark, otherwise it's ignored.
127   */
128  public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
129    "hbase.server.netty.writable.watermark.fatal";
130  private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;
131
132  private final InetSocketAddress bindAddress;
133
134  private final CountDownLatch closed = new CountDownLatch(1);
135  private final Channel serverChannel;
136  final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
137  private final ByteBufAllocator channelAllocator;
138  private final AtomicReference<SslContext> sslContextForServer = new AtomicReference<>();
139  private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
140  private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
141
142  private volatile int writeBufferFatalThreshold;
143  private volatile WriteBufferWaterMark writeBufferWaterMark;
144
145  public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
146    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
147    boolean reservoirEnabled) throws IOException {
148    super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
149    this.bindAddress = bindAddress;
150    this.channelAllocator = getChannelAllocator(conf);
151    // Get the event loop group configuration from the server class if available.
152    NettyEventLoopGroupConfig config = null;
153    if (server instanceof HRegionServer) {
154      config = ((HRegionServer) server).getEventLoopGroupConfig();
155    }
156    if (config == null) {
157      config = new NettyEventLoopGroupConfig(conf, "NettyRpcServer");
158    }
159
160    // call before creating bootstrap below so that the necessary configs can be set
161    configureNettyWatermarks(conf);
162
163    EventLoopGroup eventLoopGroup = config.group();
164    Class<? extends ServerChannel> channelClass = config.serverChannelClass();
165    ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
166      .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
167      .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
168      .childOption(ChannelOption.SO_REUSEADDR, true)
169      .childHandler(new ChannelInitializer<Channel>() {
170        @Override
171        protected void initChannel(Channel ch) throws Exception {
172          ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
173          ch.config().setAllocator(channelAllocator);
174          ChannelPipeline pipeline = ch.pipeline();
175
176          NettyServerRpcConnection conn = createNettyServerRpcConnection(ch);
177
178          if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
179            initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
180          }
181          pipeline
182            .addLast(NettyRpcServerPreambleHandler.DECODER_NAME,
183              NettyRpcServerPreambleHandler.createDecoder())
184            .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn))
185            // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may
186            // send RpcResponse to client.
187            .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics))
188            // Add writability handler after the response encoder, so we can abort writes before
189            // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so
190            // that the handler configs can be live updated via update_config.
191            .addLast(NettyRpcServerChannelWritabilityHandler.NAME,
192              new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold,
193                () -> isWritabilityBackpressureEnabled()));
194        }
195      });
196    try {
197      serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
198      LOG.info("Bind to {}", serverChannel.localAddress());
199    } catch (InterruptedException e) {
200      throw new InterruptedIOException(e.getMessage());
201    }
202    initReconfigurable(conf);
203    this.scheduler.init(new RpcSchedulerContext(this));
204  }
205
206  @Override
207  public void onConfigurationChange(Configuration newConf) {
208    super.onConfigurationChange(newConf);
209    configureNettyWatermarks(newConf);
210  }
211
212  private void configureNettyWatermarks(Configuration conf) {
213    int watermarkLow =
214      conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
215    int watermarkHigh =
216      conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
217    int fatalThreshold =
218      conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
219
220    WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
221    int oldFatalThreshold = writeBufferFatalThreshold;
222
223    boolean disabled = false;
224    if (watermarkHigh == 0 && watermarkLow == 0) {
225      // if both are 0, use the netty default, which we will treat as "disabled".
226      // when disabled, we won't manage autoRead in response to writability changes.
227      writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
228      disabled = true;
229    } else {
230      // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to
231      // 1 to avoid confusing behavior.
232      if (watermarkLow == 0) {
233        LOG.warn(
234          "Detected a {} value of 0, which is impossible to achieve "
235            + "due to how netty evaluates these thresholds, setting to 1",
236          CHANNEL_WRITABLE_LOW_WATERMARK_KEY);
237        watermarkLow = 1;
238      }
239
240      // netty validates the watermarks and throws an exception if high < low, fail more gracefully
241      // by disabling the watermarks and warning.
242      if (watermarkHigh <= watermarkLow) {
243        LOG.warn(
244          "Detected {} value {}, lower than {} value {}. This will fail netty validation, "
245            + "so disabling",
246          CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
247          watermarkLow);
248        writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
249      } else {
250        writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh);
251      }
252
253      // only apply this check when watermark is enabled. this way we give the operator some
254      // flexibility if they want to try enabling fatal threshold without backpressure.
255      if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
256        LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.",
257          CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
258          watermarkHigh);
259        fatalThreshold = 0;
260      }
261    }
262
263    writeBufferFatalThreshold = fatalThreshold;
264
265    if (
266      oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
267        || oldWaterMark.high() != writeBufferWaterMark.high()
268        || oldFatalThreshold != writeBufferFatalThreshold)
269    ) {
270      LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}",
271        disabled ? "disabled" : writeBufferWaterMark.low(),
272        disabled ? "disabled" : writeBufferWaterMark.high(),
273        writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold);
274    }
275
276    // update any existing channels
277    for (Channel channel : allChannels) {
278      channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
279      // if disabling watermark, set auto read to true in case channel had been exceeding
280      // previous watermark
281      if (disabled) {
282        channel.config().setAutoRead(true);
283      }
284    }
285  }
286
287  public boolean isWritabilityBackpressureEnabled() {
288    return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
289  }
290
291  private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
292    final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
293    if (value != null) {
294      if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
295        LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
296        return PooledByteBufAllocator.DEFAULT;
297      } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
298        LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
299        return UnpooledByteBufAllocator.DEFAULT;
300      } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
301        LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
302        return HeapByteBufAllocator.DEFAULT;
303      } else {
304        // If the value is none of the recognized labels, treat it as a class name. This allows the
305        // user to supply a custom implementation, perhaps for debugging.
306        try {
307          // ReflectionUtils throws UnsupportedOperationException if there are any problems.
308          ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
309          LOG.info("Using {} for buffer allocation", value);
310          return alloc;
311        } catch (ClassCastException | UnsupportedOperationException e) {
312          throw new IOException(e);
313        }
314      }
315    } else {
316      LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
317      return PooledByteBufAllocator.DEFAULT;
318    }
319  }
320
321  // will be overridden in tests
322  @InterfaceAudience.Private
323  protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
324    return new NettyServerRpcConnection(NettyRpcServer.this, channel);
325  }
326
327  @Override
328  public synchronized void start() {
329    if (started) {
330      return;
331    }
332    authTokenSecretMgr = createSecretManager();
333    if (authTokenSecretMgr != null) {
334      // Start AuthenticationTokenSecretManager in synchronized way to avoid race conditions in
335      // LeaderElector start. See HBASE-25875
336      synchronized (authTokenSecretMgr) {
337        setSecretManager(authTokenSecretMgr);
338        authTokenSecretMgr.start();
339      }
340    }
341    this.authManager = new ServiceAuthorizationManager();
342    HBasePolicyProvider.init(conf, authManager);
343    scheduler.start();
344    started = true;
345  }
346
347  @Override
348  public synchronized void stop() {
349    if (!running) {
350      return;
351    }
352    LOG.info("Stopping server on " + this.serverChannel.localAddress());
353    FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
354    if (ks != null) {
355      ks.stop();
356    }
357    FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
358    if (ts != null) {
359      ts.stop();
360    }
361    if (authTokenSecretMgr != null) {
362      authTokenSecretMgr.stop();
363      authTokenSecretMgr = null;
364    }
365    allChannels.close().awaitUninterruptibly();
366    serverChannel.close();
367    scheduler.stop();
368    closed.countDown();
369    running = false;
370  }
371
372  @Override
373  public synchronized void join() throws InterruptedException {
374    closed.await();
375  }
376
377  @Override
378  public synchronized InetSocketAddress getListenerAddress() {
379    return ((InetSocketAddress) serverChannel.localAddress());
380  }
381
382  @Override
383  public void setSocketSendBufSize(int size) {
384  }
385
386  @Override
387  public int getNumOpenConnections() {
388    return allChannels.size();
389  }
390
391  @Override
392  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
393    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
394    throws IOException {
395    return call(service, md, param, cellScanner, receiveTime, status,
396      EnvironmentEdgeManager.currentTime(), 0);
397  }
398
399  @Override
400  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
401    Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
402    long startTime, int timeout) throws IOException {
403    NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
404      -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null);
405    return call(fakeCall, status);
406  }
407
408  private void initSSL(ChannelPipeline p, NettyServerRpcConnection conn, boolean supportPlaintext)
409    throws X509Exception, IOException {
410    SslContext nettySslContext = getSslContext();
411
412    if (supportPlaintext) {
413      p.addLast("ssl", new OptionalSslHandler(nettySslContext));
414      LOG.debug("Dual mode SSL handler added for channel: {}", p.channel());
415    } else {
416      SocketAddress remoteAddress = p.channel().remoteAddress();
417      SslHandler sslHandler;
418
419      if (remoteAddress instanceof InetSocketAddress) {
420        InetSocketAddress remoteInetAddress = (InetSocketAddress) remoteAddress;
421        String host;
422
423        if (conf.getBoolean(TLS_CONFIG_REVERSE_DNS_LOOKUP_ENABLED, true)) {
424          host = remoteInetAddress.getHostName();
425        } else {
426          host = remoteInetAddress.getHostString();
427        }
428
429        int port = remoteInetAddress.getPort();
430
431        /*
432         * our HostnameVerifier gets the host name from SSLEngine, so we have to construct the
433         * engine properly by passing the remote address
434         */
435        sslHandler = nettySslContext.newHandler(p.channel().alloc(), host, port);
436      } else {
437        sslHandler = nettySslContext.newHandler(p.channel().alloc());
438      }
439
440      sslHandler.setWrapDataSize(
441        conf.getInt(HBASE_SERVER_NETTY_TLS_WRAP_SIZE, DEFAULT_HBASE_SERVER_NETTY_TLS_WRAP_SIZE));
442
443      sslHandler.handshakeFuture()
444        .addListener(future -> sslHandshakeCompleteHandler(conn, sslHandler, remoteAddress));
445
446      p.addLast("ssl", sslHandler);
447      LOG.debug("SSL handler added for channel: {}", p.channel());
448    }
449  }
450
451  static void sslHandshakeCompleteHandler(NettyServerRpcConnection conn, SslHandler sslHandler,
452    SocketAddress remoteAddress) {
453    try {
454      Certificate[] certificates = sslHandler.engine().getSession().getPeerCertificates();
455      if (certificates != null && certificates.length > 0) {
456        X509Certificate[] x509Certificates = new X509Certificate[certificates.length];
457        for (int i = 0; i < x509Certificates.length; i++) {
458          x509Certificates[i] = (X509Certificate) certificates[i];
459        }
460        conn.clientCertificateChain = x509Certificates;
461      } else if (sslHandler.engine().getNeedClientAuth()) {
462        LOG.debug(
463          "Could not get peer certificate on TLS connection from {}, although one is required",
464          remoteAddress);
465      }
466    } catch (SSLPeerUnverifiedException e) {
467      if (sslHandler.engine().getNeedClientAuth()) {
468        LOG.debug(
469          "Could not get peer certificate on TLS connection from {}, although one is required",
470          remoteAddress, e);
471      }
472    } catch (Exception e) {
473      LOG.debug("Unexpected error getting peer certificate for TLS connection from {}",
474        remoteAddress, e);
475    }
476  }
477
478  SslContext getSslContext() throws X509Exception, IOException {
479    SslContext result = sslContextForServer.get();
480    if (result == null) {
481      result = X509Util.createSslContextForServer(conf);
482      if (!sslContextForServer.compareAndSet(null, result)) {
483        // lost the race, another thread already set the value
484        result = sslContextForServer.get();
485      } else if (
486        keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
487          && conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
488      ) {
489        X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
490          () -> sslContextForServer.set(null));
491      }
492    }
493    return result;
494  }
495
496  public int getWriteBufferFatalThreshold() {
497    return writeBufferFatalThreshold;
498  }
499
500  public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
501    long total = 0;
502    long max = 0;
503    for (Channel channel : allChannels) {
504      long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
505      total += outbound;
506      max = Math.max(max, outbound);
507    }
508    return Pair.newPair(total, max);
509  }
510}