001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute;
023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
025
026import java.io.IOException;
027import java.net.InetSocketAddress;
028import java.net.UnknownHostException;
029import java.util.Set;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.TimeUnit;
034import javax.security.sasl.SaslException;
035import org.apache.hadoop.hbase.client.ConnectionUtils;
036import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
037import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
038import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
039import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
040import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
041import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
042import org.apache.hadoop.hbase.util.NettyFutureUtils;
043import org.apache.hadoop.hbase.util.Threads;
044import org.apache.hadoop.security.UserGroupInformation;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
051import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
052import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
053import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
054import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
055import org.apache.hbase.thirdparty.io.netty.channel.Channel;
056import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
057import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
058import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
060import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
061import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
062import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
063import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext;
064import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler;
065import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
066import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
067import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
068import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
069import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
070import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
073
074/**
075 * RPC connection implementation based on netty.
076 * <p/>
077 * Most operations are executed in handlers. Netty handler is always executed in the same
078 * thread(EventLoop) so no lock is needed.
079 * <p/>
080 * <strong>Implementation assumptions:</strong> All the private methods should be called in the
081 * {@link #eventLoop} thread, otherwise there will be races.
082 * @since 2.0.0
083 */
084@InterfaceAudience.Private
085class NettyRpcConnection extends RpcConnection {
086
087  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
088
089  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
090    .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
091      .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
092
093  private final NettyRpcClient rpcClient;
094
095  // the event loop used to set up the connection, we will also execute other operations for this
096  // connection in this event loop, to avoid locking everywhere.
097  private final EventLoop eventLoop;
098
099  private ByteBuf connectionHeaderPreamble;
100
101  private ByteBuf connectionHeaderWithLength;
102
103  // make it volatile so in the isActive method below we do not need to switch to the event loop
104  // thread to access this field.
105  private volatile Channel channel;
106
107  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {
108    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
109      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
110      rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes);
111    this.rpcClient = rpcClient;
112    this.eventLoop = rpcClient.group.next();
113    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
114    this.connectionHeaderPreamble =
115      Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
116    ConnectionHeader header = getConnectionHeader();
117    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
118    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
119    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
120  }
121
122  @Override
123  protected void callTimeout(Call call) {
124    execute(eventLoop, () -> {
125      if (channel != null) {
126        channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
127      }
128    });
129  }
130
131  @Override
132  public boolean isActive() {
133    return channel != null;
134  }
135
136  private void shutdown0() {
137    assert eventLoop.inEventLoop();
138    if (channel != null) {
139      NettyFutureUtils.consume(channel.close());
140      channel = null;
141    }
142  }
143
144  @Override
145  public void shutdown() {
146    execute(eventLoop, this::shutdown0);
147  }
148
149  @Override
150  public void cleanupConnection() {
151    execute(eventLoop, () -> {
152      if (connectionHeaderPreamble != null) {
153        ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
154        connectionHeaderPreamble = null;
155      }
156      if (connectionHeaderWithLength != null) {
157        ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
158        connectionHeaderWithLength = null;
159      }
160    });
161  }
162
163  private void established(Channel ch) {
164    assert eventLoop.inEventLoop();
165    ch.pipeline()
166      .addBefore(BufferCallBeforeInitHandler.NAME, null,
167        new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
168      .addBefore(BufferCallBeforeInitHandler.NAME, null,
169        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
170      .addBefore(BufferCallBeforeInitHandler.NAME, null,
171        new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
172      .fireUserEventTriggered(BufferCallEvent.success());
173  }
174
175  private void saslEstablished(Channel ch, String serverPrincipal) {
176    saslNegotiationDone(serverPrincipal, true);
177    established(ch);
178  }
179
180  private boolean reloginInProgress;
181
182  @SuppressWarnings("FutureReturnValueIgnored")
183  private void scheduleRelogin(Throwable error) {
184    assert eventLoop.inEventLoop();
185    if (error instanceof FallbackDisallowedException) {
186      return;
187    }
188    if (!provider.canRetry()) {
189      LOG.trace("SASL Provider does not support retries");
190      return;
191    }
192    if (reloginInProgress) {
193      return;
194    }
195    reloginInProgress = true;
196    RELOGIN_EXECUTOR.schedule(() -> {
197      try {
198        provider.relogin();
199      } catch (IOException e) {
200        LOG.warn("Relogin failed", e);
201      }
202      eventLoop.execute(() -> {
203        reloginInProgress = false;
204      });
205    }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
206  }
207
208  private void failInit(Channel ch, IOException e) {
209    assert eventLoop.inEventLoop();
210    // fail all pending calls
211    ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
212    shutdown0();
213    rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), e);
214  }
215
216  private void saslFailInit(Channel ch, String serverPrincipal, IOException error) {
217    assert eventLoop.inEventLoop();
218    saslNegotiationDone(serverPrincipal, false);
219    failInit(ch, error);
220  }
221
222  private void saslNegotiate(Channel ch, String serverPrincipal) {
223    assert eventLoop.inEventLoop();
224    NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate());
225    UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
226    if (ticket == null) {
227      saslFailInit(ch, serverPrincipal, new FatalConnectionException("ticket/user is null"));
228      return;
229    }
230    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
231    final NettyHBaseSaslRpcClientHandler saslHandler;
232    try {
233      saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
234        ((InetSocketAddress) ch.remoteAddress()).getAddress(), serverPrincipal,
235        rpcClient.fallbackAllowed, this.rpcClient.conf);
236    } catch (IOException e) {
237      saslFailInit(ch, serverPrincipal, e);
238      return;
239    }
240    ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
241      .addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME,
242        saslHandler);
243    NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {
244
245      @Override
246      public void operationComplete(Future<Boolean> future) throws Exception {
247        if (future.isSuccess()) {
248          ChannelPipeline p = ch.pipeline();
249          // check if negotiate with server for connection header is necessary
250          if (saslHandler.isNeedProcessConnectionHeader()) {
251            Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
252            // create the handler to handle the connection header
253            NettyHBaseRpcConnectionHeaderHandler chHandler =
254              new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
255                connectionHeaderWithLength);
256
257            // add ReadTimeoutHandler to deal with server doesn't response connection header
258            // because of the different configuration in client side and server side
259            final String readTimeoutHandlerName = "ReadTimeout";
260            p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
261              new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS))
262              .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
263            NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() {
264              @Override
265              public void operationComplete(Future<Boolean> future) throws Exception {
266                if (future.isSuccess()) {
267                  ChannelPipeline p = ch.pipeline();
268                  p.remove(readTimeoutHandlerName);
269                  p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
270                  // don't send connection header, NettyHBaseRpcConnectionHeaderHandler
271                  // sent it already
272                  saslEstablished(ch, serverPrincipal);
273                } else {
274                  final Throwable error = future.cause();
275                  scheduleRelogin(error);
276                  saslFailInit(ch, serverPrincipal, toIOE(error));
277                }
278              }
279            });
280          } else {
281            // send the connection header to server
282            ch.write(connectionHeaderWithLength.retainedDuplicate());
283            saslEstablished(ch, serverPrincipal);
284          }
285        } else {
286          final Throwable error = future.cause();
287          scheduleRelogin(error);
288          saslFailInit(ch, serverPrincipal, toIOE(error));
289        }
290      }
291    });
292  }
293
294  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
295    assert eventLoop.inEventLoop();
296    PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
297      RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
298  }
299
300  private void onSecurityPreambleError(Channel ch, Set<String> serverPrincipals,
301    IOException error) {
302    assert eventLoop.inEventLoop();
303    LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, error);
304    if (ConnectionUtils.isUnexpectedPreambleHeaderException(error)) {
305      // this means we are connecting to an old server which does not support the security
306      // preamble call, so we should fallback to randomly select a principal to use
307      // TODO: find a way to reconnect without failing all the pending calls, for now, when we
308      // reach here, shutdown should have already been scheduled
309      return;
310    }
311    if (IPCUtil.isSecurityNotEnabledException(error)) {
312      // server tells us security is not enabled, then we should check whether fallback to
313      // simple is allowed, if so we just go without security, otherwise we should fail the
314      // negotiation immediately
315      if (rpcClient.fallbackAllowed) {
316        // TODO: just change the preamble and skip the fallback to simple logic, for now, just
317        // select the first principal can finish the connection setup, but waste one client
318        // message
319        saslNegotiate(ch, serverPrincipals.iterator().next());
320      } else {
321        failInit(ch, new FallbackDisallowedException());
322      }
323      return;
324    }
325    // usually we should not reach here, but for robust, just randomly select a principal to
326    // connect
327    saslNegotiate(ch, randomSelect(serverPrincipals));
328  }
329
330  private void onSecurityPreambleFinish(Channel ch, Set<String> serverPrincipals,
331    Call securityPreambleCall) {
332    assert eventLoop.inEventLoop();
333    String serverPrincipal;
334    try {
335      serverPrincipal = chooseServerPrincipal(serverPrincipals, securityPreambleCall);
336    } catch (SaslException e) {
337      failInit(ch, e);
338      return;
339    }
340    saslNegotiate(ch, serverPrincipal);
341  }
342
343  private void saslNegotiate(Channel ch) throws IOException {
344    assert eventLoop.inEventLoop();
345    Set<String> serverPrincipals = getServerPrincipals();
346    if (serverPrincipals.size() == 1) {
347      saslNegotiate(ch, serverPrincipals.iterator().next());
348      return;
349    }
350    // this means we use kerberos authentication and there are multiple server principal candidates,
351    // in this way we need to send a special preamble header to get server principal from server
352    Call securityPreambleCall = createSecurityPreambleCall(call -> {
353      assert eventLoop.inEventLoop();
354      if (call.error != null) {
355        onSecurityPreambleError(ch, serverPrincipals, call.error);
356      } else {
357        onSecurityPreambleFinish(ch, serverPrincipals, call);
358      }
359    });
360    PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
361      RpcClient.SECURITY_PREAMBLE_HEADER, securityPreambleCall);
362  }
363
364  private void connect(Call connectionRegistryCall) throws UnknownHostException {
365    assert eventLoop.inEventLoop();
366    LOG.trace("Connecting to {}", remoteId.getAddress());
367    InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
368    this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
369      .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
370      .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
371      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
372      .handler(new ChannelInitializer<Channel>() {
373        @Override
374        protected void initChannel(Channel ch) throws Exception {
375          if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) {
376            SslContext sslContext = rpcClient.getSslContext();
377            SslHandler sslHandler = sslContext.newHandler(ch.alloc(),
378              remoteId.address.getHostName(), remoteId.address.getPort());
379            sslHandler.setHandshakeTimeoutMillis(
380              conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT,
381                X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS));
382            ch.pipeline().addFirst(sslHandler);
383            LOG.debug("SSL handler added with handshake timeout {} ms",
384              sslHandler.getHandshakeTimeoutMillis());
385          }
386          ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
387            new BufferCallBeforeInitHandler());
388        }
389      }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
390      .addListener(new ChannelFutureListener() {
391
392        private void succeed(Channel ch) throws IOException {
393          if (connectionRegistryCall != null) {
394            getConnectionRegistry(ch, connectionRegistryCall);
395            return;
396          }
397          if (!useSasl) {
398            // BufferCallBeforeInitHandler will call ctx.flush when receiving the
399            // BufferCallEvent.success() event, so here we just use write for the below two messages
400            NettyFutureUtils.safeWrite(ch, connectionHeaderPreamble.retainedDuplicate());
401            NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate());
402            established(ch);
403          } else {
404            saslNegotiate(ch);
405          }
406        }
407
408        private void fail(Channel ch, Throwable error) {
409          IOException ex = toIOE(error);
410          LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(),
411            ex);
412          if (connectionRegistryCall != null) {
413            connectionRegistryCall.setException(ex);
414          }
415          failInit(ch, ex);
416        }
417
418        @Override
419        public void operationComplete(ChannelFuture future) throws Exception {
420          Channel ch = future.channel();
421          if (!future.isSuccess()) {
422            fail(ch, future.cause());
423            return;
424          }
425          SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
426          if (sslHandler != null) {
427            NettyFutureUtils.addListener(sslHandler.handshakeFuture(), f -> {
428              if (f.isSuccess()) {
429                succeed(ch);
430              } else {
431                fail(ch, f.cause());
432              }
433            });
434          } else {
435            succeed(ch);
436          }
437        }
438      }).channel();
439  }
440
441  private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException {
442    assert eventLoop.inEventLoop();
443    if (call.isConnectionRegistryCall()) {
444      // For get connection registry call, we will send a special preamble header to get the
445      // response, instead of sending a real rpc call. See HBASE-25051
446      connect(call);
447      return;
448    }
449    if (reloginInProgress) {
450      throw new IOException(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS);
451    }
452    hrc.notifyOnCancel(new RpcCallback<Object>() {
453
454      @Override
455      public void run(Object parameter) {
456        setCancelled(call);
457        if (channel != null) {
458          channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
459        }
460      }
461    }, new CancellationCallback() {
462
463      @Override
464      public void run(boolean cancelled) throws IOException {
465        if (cancelled) {
466          setCancelled(call);
467        } else {
468          if (channel == null) {
469            connect(null);
470          }
471          scheduleTimeoutTask(call);
472          NettyFutureUtils.addListener(channel.writeAndFlush(call), new ChannelFutureListener() {
473            @Override
474            public void operationComplete(ChannelFuture future) throws Exception {
475              // Fail the call if we failed to write it out. This usually because the channel is
476              // closed. This is needed because we may shutdown the channel inside event loop and
477              // there may still be some pending calls in the event loop queue after us.
478              if (!future.isSuccess()) {
479                call.setException(toIOE(future.cause()));
480              }
481            }
482          });
483        }
484      }
485    });
486  }
487
488  @Override
489  public void sendRequest(final Call call, HBaseRpcController hrc) {
490    execute(eventLoop, () -> {
491      try {
492        sendRequest0(call, hrc);
493      } catch (Exception e) {
494        call.setException(toIOE(e));
495      }
496    });
497  }
498}