001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import io.opentelemetry.context.Scope; 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.hadoop.hbase.codec.Codec; 025import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 026import org.apache.hadoop.io.compress.CompressionCodec; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 032import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 033import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 034import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 035import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; 036import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 037import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 038import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 039import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; 043 044/** 045 * The netty rpc handler. 046 * @since 2.0.0 047 */ 048@InterfaceAudience.Private 049class NettyRpcDuplexHandler extends ChannelDuplexHandler { 050 051 private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class); 052 053 private final NettyRpcConnection conn; 054 055 private final CellBlockBuilder cellBlockBuilder; 056 057 private final Codec codec; 058 059 private final CompressionCodec compressor; 060 061 private final Map<Integer, Call> id2Call = new HashMap<>(); 062 063 public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder, 064 Codec codec, CompressionCodec compressor) { 065 this.conn = conn; 066 this.cellBlockBuilder = cellBlockBuilder; 067 this.codec = codec; 068 this.compressor = compressor; 069 070 } 071 072 private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise) 073 throws IOException { 074 id2Call.put(call.id, call); 075 ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc()); 076 CellBlockMeta cellBlockMeta; 077 if (cellBlock != null) { 078 CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder(); 079 cellBlockMetaBuilder.setLength(cellBlock.writerIndex()); 080 cellBlockMeta = cellBlockMetaBuilder.build(); 081 } else { 082 cellBlockMeta = null; 083 } 084 RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta); 085 int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param); 086 int totalSize = 087 cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() : sizeWithoutCellBlock; 088 ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4); 089 buf.writeInt(totalSize); 090 try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) { 091 requestHeader.writeDelimitedTo(bbos); 092 if (call.param != null) { 093 call.param.writeDelimitedTo(bbos); 094 } 095 if (cellBlock != null) { 096 ChannelPromise withoutCellBlockPromise = ctx.newPromise(); 097 ctx.write(buf, withoutCellBlockPromise); 098 ChannelPromise cellBlockPromise = ctx.newPromise(); 099 ctx.write(cellBlock, cellBlockPromise); 100 PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); 101 combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise); 102 combiner.finish(promise); 103 } else { 104 ctx.write(buf, promise); 105 } 106 call.callStats.setRequestSizeBytes(totalSize); 107 } 108 } 109 110 @Override 111 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 112 throws Exception { 113 if (msg instanceof Call) { 114 Call call = (Call) msg; 115 try (Scope scope = call.span.makeCurrent()) { 116 writeRequest(ctx, call, promise); 117 } 118 } else { 119 ctx.write(msg, promise); 120 } 121 } 122 123 private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { 124 try { 125 conn.readResponse(new ByteBufInputStream(buf), id2Call, null, 126 remoteExc -> exceptionCaught(ctx, remoteExc)); 127 } catch (IOException e) { 128 // In netty, the decoding the frame based, when reaching here we have already read a full 129 // frame, so hitting exception here does not mean the stream decoding is broken, thus we do 130 // not need to throw the exception out and close the connection. 131 LOG.warn("failed to process response", e); 132 } 133 } 134 135 @Override 136 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 137 if (msg instanceof ByteBuf) { 138 ByteBuf buf = (ByteBuf) msg; 139 try { 140 readResponse(ctx, buf); 141 } finally { 142 buf.release(); 143 } 144 } else { 145 super.channelRead(ctx, msg); 146 } 147 } 148 149 private void cleanupCalls(IOException error) { 150 for (Call call : id2Call.values()) { 151 call.setException(error); 152 } 153 id2Call.clear(); 154 } 155 156 @Override 157 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 158 if (!id2Call.isEmpty()) { 159 cleanupCalls(new ConnectionClosedException("Connection closed")); 160 } 161 conn.shutdown(); 162 ctx.fireChannelInactive(); 163 } 164 165 @Override 166 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 167 if (!id2Call.isEmpty()) { 168 cleanupCalls(IPCUtil.toIOE(cause)); 169 } 170 conn.shutdown(); 171 } 172 173 @Override 174 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 175 if (evt instanceof IdleStateEvent) { 176 IdleStateEvent idleEvt = (IdleStateEvent) evt; 177 switch (idleEvt.state()) { 178 case WRITER_IDLE: 179 if (id2Call.isEmpty()) { 180 if (LOG.isTraceEnabled()) { 181 LOG.trace("shutdown connection to " + conn.remoteId().address 182 + " because idle for a long time"); 183 } 184 // It may happen that there are still some pending calls in the event loop queue and 185 // they will get a closed channel exception. But this is not a big deal as it rarely 186 // rarely happens and the upper layer could retry immediately. 187 conn.shutdown(); 188 } 189 break; 190 default: 191 LOG.warn("Unrecognized idle state " + idleEvt.state()); 192 break; 193 } 194 } else if (evt instanceof CallEvent) { 195 // just remove the call for now until we add other call event other than timeout and cancel. 196 id2Call.remove(((CallEvent) evt).call.id); 197 } else { 198 ctx.fireUserEventTriggered(evt); 199 } 200 } 201}