001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import javax.net.ssl.SSLException;
024import org.apache.hadoop.hbase.util.NettyFutureUtils;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
030import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
031import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
032
033/**
034 * We will expose the connection to upper layer before initialized, so we need to buffer the calls
035 * passed in and write them out once the connection is established.
036 */
037@InterfaceAudience.Private
038class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
039
040  private static final Logger LOG = LoggerFactory.getLogger(BufferCallBeforeInitHandler.class);
041
042  static final String NAME = "BufferCall";
043
044  private enum BufferCallAction {
045    FLUSH,
046    FAIL
047  }
048
049  public static final class BufferCallEvent {
050
051    public final BufferCallAction action;
052
053    public final IOException error;
054
055    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
056      IOException error) {
057      this.action = action;
058      this.error = error;
059    }
060
061    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
062      return SUCCESS_EVENT;
063    }
064
065    public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
066      return new BufferCallEvent(BufferCallAction.FAIL, error);
067    }
068  }
069
070  private static final BufferCallEvent SUCCESS_EVENT =
071    new BufferCallEvent(BufferCallAction.FLUSH, null);
072
073  private final Map<Integer, Call> id2Call = new HashMap<>();
074
075  @Override
076  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
077    if (msg instanceof Call) {
078      Call call = (Call) msg;
079      id2Call.put(call.id, call);
080      // The call is already in track so here we set the write operation as success.
081      // We will fail the call directly if we can not write it out.
082      promise.trySuccess();
083    } else {
084      NettyFutureUtils.consume(ctx.write(msg, promise));
085    }
086  }
087
088  @Override
089  public void flush(ChannelHandlerContext ctx) throws Exception {
090    // do not flush anything out
091  }
092
093  @Override
094  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
095    if (evt instanceof BufferCallEvent) {
096      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
097      switch (bcEvt.action) {
098        case FLUSH:
099          for (Call call : id2Call.values()) {
100            NettyFutureUtils.safeWrite(ctx, call);
101          }
102          ctx.flush();
103          ctx.pipeline().remove(this);
104          break;
105        case FAIL:
106          for (Call call : id2Call.values()) {
107            call.setException(bcEvt.error);
108          }
109          // here we do not remove us from the pipeline, for receiving possible exceptions and log
110          // it, especially the ssl exceptions, to prevent it reaching the tail of the pipeline and
111          // generate a confusing netty WARN
112          break;
113      }
114    } else if (evt instanceof CallEvent) {
115      // just remove the call for now until we add other call event other than timeout and cancel.
116      id2Call.remove(((CallEvent) evt).call.id);
117    } else {
118      ctx.fireUserEventTriggered(evt);
119    }
120  }
121
122  private boolean isSslError(Throwable cause) {
123    Throwable error = cause;
124    do {
125      if (error instanceof SSLException) {
126        return true;
127      }
128      error = error.getCause();
129    } while (error != null);
130    return false;
131  }
132
133  @Override
134  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
135    if (isSslError(cause)) {
136      // this should have been logged in other places, see HBASE-27782 for more details.
137      // here we just log it with debug and tell users that this is not a critical problem,
138      // otherwise if we just pass it through the pipeline, it will lead to a confusing
139      // "An exceptionCaught() event was fired, and it reached at the tail of the pipeline"
140      LOG.debug(
141        "got ssl exception, which should have already been proceeded, log it here to"
142          + " prevent it being passed to netty's TailContext and trigger a confusing WARN message",
143        cause);
144    } else {
145      ctx.fireExceptionCaught(cause);
146    }
147  }
148}