001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED; 021import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; 022import static org.apache.hadoop.hbase.ipc.IPCUtil.execute; 023import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; 024import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; 025 026import java.io.IOException; 027import java.net.InetSocketAddress; 028import java.net.UnknownHostException; 029import java.util.Set; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.TimeUnit; 034import javax.security.sasl.SaslException; 035import org.apache.hadoop.hbase.client.ConnectionUtils; 036import org.apache.hadoop.hbase.io.crypto.tls.X509Util; 037import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; 038import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; 039import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; 040import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; 041import org.apache.hadoop.hbase.security.SaslChallengeDecoder; 042import org.apache.hadoop.hbase.util.NettyFutureUtils; 043import org.apache.hadoop.hbase.util.Threads; 044import org.apache.hadoop.security.UserGroupInformation; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 051import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; 052import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 053import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 054import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 055import org.apache.hbase.thirdparty.io.netty.channel.Channel; 056import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 057import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; 058import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; 059import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; 060import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 061import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; 062import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 063import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; 064import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; 065import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 066import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 067import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; 068import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; 069import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; 070import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 071 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 073 074/** 075 * RPC connection implementation based on netty. 076 * <p/> 077 * Most operations are executed in handlers. Netty handler is always executed in the same 078 * thread(EventLoop) so no lock is needed. 079 * <p/> 080 * <strong>Implementation assumptions:</strong> All the private methods should be called in the 081 * {@link #eventLoop} thread, otherwise there will be races. 082 * @since 2.0.0 083 */ 084@InterfaceAudience.Private 085class NettyRpcConnection extends RpcConnection { 086 087 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); 088 089 private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors 090 .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d") 091 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 092 093 private final NettyRpcClient rpcClient; 094 095 // the event loop used to set up the connection, we will also execute other operations for this 096 // connection in this event loop, to avoid locking everywhere. 097 private final EventLoop eventLoop; 098 099 private ByteBuf connectionHeaderPreamble; 100 101 private ByteBuf connectionHeaderWithLength; 102 103 // make it volatile so in the isActive method below we do not need to switch to the event loop 104 // thread to access this field. 105 private volatile Channel channel; 106 107 NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { 108 super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, 109 rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, 110 rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes); 111 this.rpcClient = rpcClient; 112 this.eventLoop = rpcClient.group.next(); 113 byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); 114 this.connectionHeaderPreamble = 115 Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); 116 ConnectionHeader header = getConnectionHeader(); 117 this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); 118 this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); 119 header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); 120 } 121 122 @Override 123 protected void callTimeout(Call call) { 124 execute(eventLoop, () -> { 125 if (channel != null) { 126 channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call)); 127 } 128 }); 129 } 130 131 @Override 132 public boolean isActive() { 133 return channel != null; 134 } 135 136 private void shutdown0() { 137 assert eventLoop.inEventLoop(); 138 if (channel != null) { 139 NettyFutureUtils.consume(channel.close()); 140 channel = null; 141 } 142 } 143 144 @Override 145 public void shutdown() { 146 execute(eventLoop, this::shutdown0); 147 } 148 149 @Override 150 public void cleanupConnection() { 151 execute(eventLoop, () -> { 152 if (connectionHeaderPreamble != null) { 153 ReferenceCountUtil.safeRelease(connectionHeaderPreamble); 154 connectionHeaderPreamble = null; 155 } 156 if (connectionHeaderWithLength != null) { 157 ReferenceCountUtil.safeRelease(connectionHeaderWithLength); 158 connectionHeaderWithLength = null; 159 } 160 }); 161 } 162 163 private void established(Channel ch) { 164 assert eventLoop.inEventLoop(); 165 ch.pipeline() 166 .addBefore(BufferCallBeforeInitHandler.NAME, null, 167 new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)) 168 .addBefore(BufferCallBeforeInitHandler.NAME, null, 169 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) 170 .addBefore(BufferCallBeforeInitHandler.NAME, null, 171 new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)) 172 .fireUserEventTriggered(BufferCallEvent.success()); 173 } 174 175 private void saslEstablished(Channel ch, String serverPrincipal) { 176 saslNegotiationDone(serverPrincipal, true); 177 established(ch); 178 } 179 180 private boolean reloginInProgress; 181 182 @SuppressWarnings("FutureReturnValueIgnored") 183 private void scheduleRelogin(Throwable error) { 184 assert eventLoop.inEventLoop(); 185 if (error instanceof FallbackDisallowedException) { 186 return; 187 } 188 if (!provider.canRetry()) { 189 LOG.trace("SASL Provider does not support retries"); 190 return; 191 } 192 if (reloginInProgress) { 193 return; 194 } 195 reloginInProgress = true; 196 RELOGIN_EXECUTOR.schedule(() -> { 197 try { 198 provider.relogin(); 199 } catch (IOException e) { 200 LOG.warn("Relogin failed", e); 201 } 202 eventLoop.execute(() -> { 203 reloginInProgress = false; 204 }); 205 }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS); 206 } 207 208 private void failInit(Channel ch, IOException e) { 209 assert eventLoop.inEventLoop(); 210 // fail all pending calls 211 ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); 212 shutdown0(); 213 rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), e); 214 } 215 216 private void saslFailInit(Channel ch, String serverPrincipal, IOException error) { 217 assert eventLoop.inEventLoop(); 218 saslNegotiationDone(serverPrincipal, false); 219 failInit(ch, error); 220 } 221 222 private void saslNegotiate(Channel ch, String serverPrincipal) { 223 assert eventLoop.inEventLoop(); 224 NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate()); 225 UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket()); 226 if (ticket == null) { 227 saslFailInit(ch, serverPrincipal, new FatalConnectionException("ticket/user is null")); 228 return; 229 } 230 Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); 231 final NettyHBaseSaslRpcClientHandler saslHandler; 232 try { 233 saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, 234 ((InetSocketAddress) ch.remoteAddress()).getAddress(), serverPrincipal, 235 rpcClient.fallbackAllowed, this.rpcClient.conf); 236 } catch (IOException e) { 237 saslFailInit(ch, serverPrincipal, e); 238 return; 239 } 240 ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder()) 241 .addBefore(BufferCallBeforeInitHandler.NAME, NettyHBaseSaslRpcClientHandler.HANDLER_NAME, 242 saslHandler); 243 NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() { 244 245 @Override 246 public void operationComplete(Future<Boolean> future) throws Exception { 247 if (future.isSuccess()) { 248 ChannelPipeline p = ch.pipeline(); 249 // check if negotiate with server for connection header is necessary 250 if (saslHandler.isNeedProcessConnectionHeader()) { 251 Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise(); 252 // create the handler to handle the connection header 253 NettyHBaseRpcConnectionHeaderHandler chHandler = 254 new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf, 255 connectionHeaderWithLength); 256 257 // add ReadTimeoutHandler to deal with server doesn't response connection header 258 // because of the different configuration in client side and server side 259 final String readTimeoutHandlerName = "ReadTimeout"; 260 p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName, 261 new ReadTimeoutHandler(rpcClient.readTO, TimeUnit.MILLISECONDS)) 262 .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler); 263 NettyFutureUtils.addListener(connectionHeaderPromise, new FutureListener<Boolean>() { 264 @Override 265 public void operationComplete(Future<Boolean> future) throws Exception { 266 if (future.isSuccess()) { 267 ChannelPipeline p = ch.pipeline(); 268 p.remove(readTimeoutHandlerName); 269 p.remove(NettyHBaseRpcConnectionHeaderHandler.class); 270 // don't send connection header, NettyHBaseRpcConnectionHeaderHandler 271 // sent it already 272 saslEstablished(ch, serverPrincipal); 273 } else { 274 final Throwable error = future.cause(); 275 scheduleRelogin(error); 276 saslFailInit(ch, serverPrincipal, toIOE(error)); 277 } 278 } 279 }); 280 } else { 281 // send the connection header to server 282 ch.write(connectionHeaderWithLength.retainedDuplicate()); 283 saslEstablished(ch, serverPrincipal); 284 } 285 } else { 286 final Throwable error = future.cause(); 287 scheduleRelogin(error); 288 saslFailInit(ch, serverPrincipal, toIOE(error)); 289 } 290 } 291 }); 292 } 293 294 private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) { 295 assert eventLoop.inEventLoop(); 296 PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, 297 RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall); 298 } 299 300 private void onSecurityPreambleError(Channel ch, Set<String> serverPrincipals, 301 IOException error) { 302 assert eventLoop.inEventLoop(); 303 LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, error); 304 if (ConnectionUtils.isUnexpectedPreambleHeaderException(error)) { 305 // this means we are connecting to an old server which does not support the security 306 // preamble call, so we should fallback to randomly select a principal to use 307 // TODO: find a way to reconnect without failing all the pending calls, for now, when we 308 // reach here, shutdown should have already been scheduled 309 return; 310 } 311 if (IPCUtil.isSecurityNotEnabledException(error)) { 312 // server tells us security is not enabled, then we should check whether fallback to 313 // simple is allowed, if so we just go without security, otherwise we should fail the 314 // negotiation immediately 315 if (rpcClient.fallbackAllowed) { 316 // TODO: just change the preamble and skip the fallback to simple logic, for now, just 317 // select the first principal can finish the connection setup, but waste one client 318 // message 319 saslNegotiate(ch, serverPrincipals.iterator().next()); 320 } else { 321 failInit(ch, new FallbackDisallowedException()); 322 } 323 return; 324 } 325 // usually we should not reach here, but for robust, just randomly select a principal to 326 // connect 327 saslNegotiate(ch, randomSelect(serverPrincipals)); 328 } 329 330 private void onSecurityPreambleFinish(Channel ch, Set<String> serverPrincipals, 331 Call securityPreambleCall) { 332 assert eventLoop.inEventLoop(); 333 String serverPrincipal; 334 try { 335 serverPrincipal = chooseServerPrincipal(serverPrincipals, securityPreambleCall); 336 } catch (SaslException e) { 337 failInit(ch, e); 338 return; 339 } 340 saslNegotiate(ch, serverPrincipal); 341 } 342 343 private void saslNegotiate(Channel ch) throws IOException { 344 assert eventLoop.inEventLoop(); 345 Set<String> serverPrincipals = getServerPrincipals(); 346 if (serverPrincipals.size() == 1) { 347 saslNegotiate(ch, serverPrincipals.iterator().next()); 348 return; 349 } 350 // this means we use kerberos authentication and there are multiple server principal candidates, 351 // in this way we need to send a special preamble header to get server principal from server 352 Call securityPreambleCall = createSecurityPreambleCall(call -> { 353 assert eventLoop.inEventLoop(); 354 if (call.error != null) { 355 onSecurityPreambleError(ch, serverPrincipals, call.error); 356 } else { 357 onSecurityPreambleFinish(ch, serverPrincipals, call); 358 } 359 }); 360 PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, 361 RpcClient.SECURITY_PREAMBLE_HEADER, securityPreambleCall); 362 } 363 364 private void connect(Call connectionRegistryCall) throws UnknownHostException { 365 assert eventLoop.inEventLoop(); 366 LOG.trace("Connecting to {}", remoteId.getAddress()); 367 InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); 368 this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) 369 .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) 370 .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) 371 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) 372 .handler(new ChannelInitializer<Channel>() { 373 @Override 374 protected void initChannel(Channel ch) throws Exception { 375 if (conf.getBoolean(X509Util.HBASE_CLIENT_NETTY_TLS_ENABLED, false)) { 376 SslContext sslContext = rpcClient.getSslContext(); 377 SslHandler sslHandler = sslContext.newHandler(ch.alloc(), 378 remoteId.address.getHostName(), remoteId.address.getPort()); 379 sslHandler.setHandshakeTimeoutMillis( 380 conf.getInt(X509Util.HBASE_CLIENT_NETTY_TLS_HANDSHAKETIMEOUT, 381 X509Util.DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS)); 382 ch.pipeline().addFirst(sslHandler); 383 LOG.debug("SSL handler added with handshake timeout {} ms", 384 sslHandler.getHandshakeTimeoutMillis()); 385 } 386 ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME, 387 new BufferCallBeforeInitHandler()); 388 } 389 }).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect() 390 .addListener(new ChannelFutureListener() { 391 392 private void succeed(Channel ch) throws IOException { 393 if (connectionRegistryCall != null) { 394 getConnectionRegistry(ch, connectionRegistryCall); 395 return; 396 } 397 if (!useSasl) { 398 // BufferCallBeforeInitHandler will call ctx.flush when receiving the 399 // BufferCallEvent.success() event, so here we just use write for the below two messages 400 NettyFutureUtils.safeWrite(ch, connectionHeaderPreamble.retainedDuplicate()); 401 NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); 402 established(ch); 403 } else { 404 saslNegotiate(ch); 405 } 406 } 407 408 private void fail(Channel ch, Throwable error) { 409 IOException ex = toIOE(error); 410 LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), 411 ex); 412 if (connectionRegistryCall != null) { 413 connectionRegistryCall.setException(ex); 414 } 415 failInit(ch, ex); 416 } 417 418 @Override 419 public void operationComplete(ChannelFuture future) throws Exception { 420 Channel ch = future.channel(); 421 if (!future.isSuccess()) { 422 fail(ch, future.cause()); 423 return; 424 } 425 SslHandler sslHandler = ch.pipeline().get(SslHandler.class); 426 if (sslHandler != null) { 427 NettyFutureUtils.addListener(sslHandler.handshakeFuture(), f -> { 428 if (f.isSuccess()) { 429 succeed(ch); 430 } else { 431 fail(ch, f.cause()); 432 } 433 }); 434 } else { 435 succeed(ch); 436 } 437 } 438 }).channel(); 439 } 440 441 private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { 442 assert eventLoop.inEventLoop(); 443 if (call.isConnectionRegistryCall()) { 444 // For get connection registry call, we will send a special preamble header to get the 445 // response, instead of sending a real rpc call. See HBASE-25051 446 connect(call); 447 return; 448 } 449 if (reloginInProgress) { 450 throw new IOException(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS); 451 } 452 hrc.notifyOnCancel(new RpcCallback<Object>() { 453 454 @Override 455 public void run(Object parameter) { 456 setCancelled(call); 457 if (channel != null) { 458 channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call)); 459 } 460 } 461 }, new CancellationCallback() { 462 463 @Override 464 public void run(boolean cancelled) throws IOException { 465 if (cancelled) { 466 setCancelled(call); 467 } else { 468 if (channel == null) { 469 connect(null); 470 } 471 scheduleTimeoutTask(call); 472 NettyFutureUtils.addListener(channel.writeAndFlush(call), new ChannelFutureListener() { 473 @Override 474 public void operationComplete(ChannelFuture future) throws Exception { 475 // Fail the call if we failed to write it out. This usually because the channel is 476 // closed. This is needed because we may shutdown the channel inside event loop and 477 // there may still be some pending calls in the event loop queue after us. 478 if (!future.isSuccess()) { 479 call.setException(toIOE(future.cause())); 480 } 481 } 482 }); 483 } 484 } 485 }); 486 } 487 488 @Override 489 public void sendRequest(final Call call, HBaseRpcController hrc) { 490 execute(eventLoop, () -> { 491 try { 492 sendRequest0(call, hrc); 493 } catch (Exception e) { 494 call.setException(toIOE(e)); 495 } 496 }); 497 } 498}