001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; 022import org.apache.hadoop.hbase.util.NettyFutureUtils; 023import org.apache.yetus.audience.InterfaceAudience; 024 025import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 026import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 027import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 028import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 029import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 030 031import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 032 033/** 034 * Implement logic to deal with the rpc connection header. 035 * @since 2.0.0 036 */ 037@InterfaceAudience.Private 038public class NettyHBaseRpcConnectionHeaderHandler extends SimpleChannelInboundHandler<ByteBuf> { 039 040 private final Promise<Boolean> saslPromise; 041 042 private final Configuration conf; 043 044 private final ByteBuf connectionHeaderWithLength; 045 046 public NettyHBaseRpcConnectionHeaderHandler(Promise<Boolean> saslPromise, Configuration conf, 047 ByteBuf connectionHeaderWithLength) { 048 this.saslPromise = saslPromise; 049 this.conf = conf; 050 this.connectionHeaderWithLength = connectionHeaderWithLength; 051 } 052 053 @Override 054 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 055 // read the ConnectionHeaderResponse from server 056 int len = msg.readInt(); 057 byte[] buff = new byte[len]; 058 msg.readBytes(buff); 059 060 RPCProtos.ConnectionHeaderResponse connectionHeaderResponse = 061 RPCProtos.ConnectionHeaderResponse.parseFrom(buff); 062 063 // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher 064 if (connectionHeaderResponse.hasCryptoCipherMeta()) { 065 CryptoAES cryptoAES = 066 EncryptionUtil.createCryptoAES(connectionHeaderResponse.getCryptoCipherMeta(), conf); 067 // replace the Sasl handler with Crypto AES handler 068 setupCryptoAESHandler(ctx.pipeline(), cryptoAES); 069 } 070 NettyFutureUtils.consume(saslPromise.setSuccess(true)); 071 } 072 073 @Override 074 public void handlerAdded(ChannelHandlerContext ctx) { 075 try { 076 // send the connection header to server first 077 NettyFutureUtils.safeWriteAndFlush(ctx, connectionHeaderWithLength.retainedDuplicate()); 078 } catch (Exception e) { 079 // the exception thrown by handlerAdded will not be passed to the exceptionCaught below 080 // because netty will remove a handler if handlerAdded throws an exception. 081 exceptionCaught(ctx, e); 082 } 083 } 084 085 @Override 086 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 087 saslPromise.tryFailure(cause); 088 } 089 090 /** 091 * Remove handlers for sasl encryption and add handlers for Crypto AES encryption 092 */ 093 private void setupCryptoAESHandler(ChannelPipeline p, CryptoAES cryptoAES) { 094 p.replace(SaslWrapHandler.class, null, new SaslWrapHandler(cryptoAES::wrap)); 095 p.replace(SaslUnwrapHandler.class, null, new SaslUnwrapHandler(cryptoAES::unwrap)); 096 } 097}