001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import io.opentelemetry.context.Scope;
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.Map;
024import org.apache.hadoop.hbase.codec.Codec;
025import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
026import org.apache.hadoop.io.compress.CompressionCodec;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
032import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
033import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
034import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
035import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
036import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
037import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
038import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
039import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
043
044/**
045 * The netty rpc handler.
046 * @since 2.0.0
047 */
048@InterfaceAudience.Private
049class NettyRpcDuplexHandler extends ChannelDuplexHandler {
050
051  private static final Logger LOG = LoggerFactory.getLogger(NettyRpcDuplexHandler.class);
052
053  private final NettyRpcConnection conn;
054
055  private final CellBlockBuilder cellBlockBuilder;
056
057  private final Codec codec;
058
059  private final CompressionCodec compressor;
060
061  private final Map<Integer, Call> id2Call = new HashMap<>();
062
063  public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder,
064    Codec codec, CompressionCodec compressor) {
065    this.conn = conn;
066    this.cellBlockBuilder = cellBlockBuilder;
067    this.codec = codec;
068    this.compressor = compressor;
069
070  }
071
072  private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise)
073    throws IOException {
074    id2Call.put(call.id, call);
075    ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc());
076    CellBlockMeta cellBlockMeta;
077    if (cellBlock != null) {
078      CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder();
079      cellBlockMetaBuilder.setLength(cellBlock.writerIndex());
080      cellBlockMeta = cellBlockMetaBuilder.build();
081    } else {
082      cellBlockMeta = null;
083    }
084    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta);
085    int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param);
086    int totalSize =
087      cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex() : sizeWithoutCellBlock;
088    ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4);
089    buf.writeInt(totalSize);
090    try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf)) {
091      requestHeader.writeDelimitedTo(bbos);
092      if (call.param != null) {
093        call.param.writeDelimitedTo(bbos);
094      }
095      if (cellBlock != null) {
096        ChannelPromise withoutCellBlockPromise = ctx.newPromise();
097        ctx.write(buf, withoutCellBlockPromise);
098        ChannelPromise cellBlockPromise = ctx.newPromise();
099        ctx.write(cellBlock, cellBlockPromise);
100        PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
101        combiner.addAll((ChannelFuture) withoutCellBlockPromise, cellBlockPromise);
102        combiner.finish(promise);
103      } else {
104        ctx.write(buf, promise);
105      }
106      call.callStats.setRequestSizeBytes(totalSize);
107    }
108  }
109
110  @Override
111  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
112    throws Exception {
113    if (msg instanceof Call) {
114      Call call = (Call) msg;
115      try (Scope scope = call.span.makeCurrent()) {
116        writeRequest(ctx, call, promise);
117      }
118    } else {
119      ctx.write(msg, promise);
120    }
121  }
122
123  private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
124    try {
125      conn.readResponse(new ByteBufInputStream(buf), id2Call, null,
126        remoteExc -> exceptionCaught(ctx, remoteExc));
127    } catch (IOException e) {
128      // In netty, the decoding the frame based, when reaching here we have already read a full
129      // frame, so hitting exception here does not mean the stream decoding is broken, thus we do
130      // not need to throw the exception out and close the connection.
131      LOG.warn("failed to process response", e);
132    }
133  }
134
135  @Override
136  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
137    if (msg instanceof ByteBuf) {
138      ByteBuf buf = (ByteBuf) msg;
139      try {
140        readResponse(ctx, buf);
141      } finally {
142        buf.release();
143      }
144    } else {
145      super.channelRead(ctx, msg);
146    }
147  }
148
149  private void cleanupCalls(IOException error) {
150    for (Call call : id2Call.values()) {
151      call.setException(error);
152    }
153    id2Call.clear();
154  }
155
156  @Override
157  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
158    if (!id2Call.isEmpty()) {
159      cleanupCalls(new ConnectionClosedException("Connection closed"));
160    }
161    conn.shutdown();
162    ctx.fireChannelInactive();
163  }
164
165  @Override
166  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
167    if (!id2Call.isEmpty()) {
168      cleanupCalls(IPCUtil.toIOE(cause));
169    }
170    conn.shutdown();
171  }
172
173  @Override
174  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
175    if (evt instanceof IdleStateEvent) {
176      IdleStateEvent idleEvt = (IdleStateEvent) evt;
177      switch (idleEvt.state()) {
178        case WRITER_IDLE:
179          if (id2Call.isEmpty()) {
180            if (LOG.isTraceEnabled()) {
181              LOG.trace("shutdown connection to " + conn.remoteId().address
182                + " because idle for a long time");
183            }
184            // It may happen that there are still some pending calls in the event loop queue and
185            // they will get a closed channel exception. But this is not a big deal as it rarely
186            // rarely happens and the upper layer could retry immediately.
187            conn.shutdown();
188          }
189          break;
190        default:
191          LOG.warn("Unrecognized idle state " + idleEvt.state());
192          break;
193      }
194    } else if (evt instanceof CallEvent) {
195      // just remove the call for now until we add other call event other than timeout and cancel.
196      id2Call.remove(((CallEvent) evt).call.id);
197    } else {
198      ctx.fireUserEventTriggered(evt);
199    }
200  }
201}