001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.util.HashMap; 021import java.util.concurrent.TimeUnit; 022import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 023import org.apache.hadoop.hbase.util.NettyFutureUtils; 024import org.apache.yetus.audience.InterfaceAudience; 025 026import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 027import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; 028import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 029import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 030import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 031import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 032import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 033import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; 034 035/** 036 * Used to decode preamble calls. 037 */ 038@InterfaceAudience.Private 039class PreambleCallHandler extends SimpleChannelInboundHandler<ByteBuf> { 040 041 private final NettyRpcConnection conn; 042 043 private final byte[] preambleHeader; 044 045 private final Call preambleCall; 046 047 PreambleCallHandler(NettyRpcConnection conn, byte[] preambleHeader, Call preambleCall) { 048 this.conn = conn; 049 this.preambleHeader = preambleHeader; 050 this.preambleCall = preambleCall; 051 } 052 053 @Override 054 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 055 NettyFutureUtils.safeWriteAndFlush(ctx, 056 Unpooled.directBuffer(preambleHeader.length).writeBytes(preambleHeader)); 057 } 058 059 @Override 060 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { 061 try { 062 conn.readResponse(new ByteBufInputStream(buf), new HashMap<>(), preambleCall, 063 remoteExc -> exceptionCaught(ctx, remoteExc)); 064 } finally { 065 ChannelPipeline p = ctx.pipeline(); 066 p.remove("PreambleCallReadTimeoutHandler"); 067 p.remove("PreambleCallFrameDecoder"); 068 p.remove(this); 069 } 070 } 071 072 @Override 073 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 074 preambleCall.setException(new ConnectionClosedException("Connection closed")); 075 ctx.fireChannelInactive(); 076 } 077 078 @Override 079 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 080 preambleCall.setException(IPCUtil.toIOE(cause)); 081 } 082 083 public static void setup(ChannelPipeline pipeline, int readTimeoutMs, NettyRpcConnection conn, 084 byte[] preambleHeader, Call preambleCall) { 085 // we do not use single decode here, as for a preamble call, we do not expect the server side 086 // will return multiple responses 087 pipeline 088 .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallReadTimeoutHandler", 089 new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS)) 090 .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallFrameDecoder", 091 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) 092 .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallHandler", 093 new PreambleCallHandler(conn, preambleHeader, preambleCall)); 094 } 095}