001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.InetAddress;
024import java.net.Socket;
025import java.nio.ByteBuffer;
026import java.nio.channels.Channels;
027import java.nio.channels.ReadableByteChannel;
028import java.nio.channels.SocketChannel;
029import java.util.concurrent.ConcurrentLinkedDeque;
030import java.util.concurrent.atomic.LongAdder;
031import java.util.concurrent.locks.Lock;
032import java.util.concurrent.locks.ReentrantLock;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.ExtendedCellScanner;
035import org.apache.hadoop.hbase.client.VersionInfoUtil;
036import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
037import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.nio.SingleByteBuff;
040import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
041import org.apache.hadoop.hbase.security.SaslStatus;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.io.BytesWritable;
044import org.apache.yetus.audience.InterfaceAudience;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
047import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
048import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
049import org.apache.hbase.thirdparty.com.google.protobuf.Message;
050
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
053
054/** Reads calls from a connection and queues them for handling. */
055@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
056    justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
057@Deprecated
058@InterfaceAudience.Private
059class SimpleServerRpcConnection extends ServerRpcConnection {
060
061  final SocketChannel channel;
062  private ByteBuff data;
063  private ByteBuffer dataLengthBuffer;
064  private ByteBuffer preambleBuffer;
065  private final LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
066  private long lastContact;
067  private final Socket socket;
068  final SimpleRpcServerResponder responder;
069
070  // If initial preamble with version and magic has been read or not.
071  private boolean connectionPreambleRead = false;
072  private boolean saslContextEstablished;
073  private ByteBuffer unwrappedData;
074  // When is this set? FindBugs wants to know! Says NP
075  private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
076  boolean useWrap = false;
077
078  final ConcurrentLinkedDeque<RpcResponse> responseQueue = new ConcurrentLinkedDeque<>();
079  final Lock responseWriteLock = new ReentrantLock();
080  long lastSentTime = -1L;
081
082  public SimpleServerRpcConnection(SimpleRpcServer rpcServer, SocketChannel channel,
083    long lastContact) {
084    super(rpcServer);
085    this.channel = channel;
086    this.lastContact = lastContact;
087    this.data = null;
088    this.dataLengthBuffer = ByteBuffer.allocate(4);
089    this.socket = channel.socket();
090    this.addr = socket.getInetAddress();
091    if (addr == null) {
092      this.hostAddress = "*Unknown*";
093    } else {
094      this.hostAddress = addr.getHostAddress();
095    }
096    this.remotePort = socket.getPort();
097    if (rpcServer.socketSendBufferSize != 0) {
098      try {
099        socket.setSendBufferSize(rpcServer.socketSendBufferSize);
100      } catch (IOException e) {
101        SimpleRpcServer.LOG.warn(
102          "Connection: unable to set socket send buffer size to " + rpcServer.socketSendBufferSize);
103      }
104    }
105    this.responder = rpcServer.responder;
106  }
107
108  public void setLastContact(long lastContact) {
109    this.lastContact = lastContact;
110  }
111
112  public long getLastContact() {
113    return lastContact;
114  }
115
116  /* Return true if the connection has no outstanding rpc */
117  boolean isIdle() {
118    return rpcCount.sum() == 0;
119  }
120
121  /* Decrement the outstanding RPC count */
122  protected void decRpcCount() {
123    rpcCount.decrement();
124  }
125
126  /* Increment the outstanding RPC count */
127  protected void incRpcCount() {
128    rpcCount.increment();
129  }
130
131  private int readPreamble() throws IOException {
132    if (preambleBuffer == null) {
133      preambleBuffer = ByteBuffer.allocate(6);
134    }
135    int count = this.rpcServer.channelRead(channel, preambleBuffer);
136    if (count < 0 || preambleBuffer.remaining() > 0) {
137      return count;
138    }
139    preambleBuffer.flip();
140    PreambleResponse resp = processPreamble(preambleBuffer);
141    switch (resp) {
142      case SUCCEED:
143        preambleBuffer = null; // do not need it anymore
144        connectionPreambleRead = true;
145        return count;
146      case CONTINUE:
147        // wait for the next preamble header
148        preambleBuffer.clear();
149        return count;
150      case CLOSE:
151        return -1;
152      default:
153        throw new IllegalArgumentException("Unknown preamble response: " + resp);
154    }
155  }
156
157  private int read4Bytes() throws IOException {
158    if (this.dataLengthBuffer.remaining() > 0) {
159      return this.rpcServer.channelRead(channel, this.dataLengthBuffer);
160    } else {
161      return 0;
162    }
163  }
164
165  private void processUnwrappedData(byte[] inBuf) throws IOException, InterruptedException {
166    ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
167    // Read all RPCs contained in the inBuf, even partial ones
168    while (true) {
169      int count;
170      if (unwrappedDataLengthBuffer.remaining() > 0) {
171        count = this.rpcServer.channelRead(ch, unwrappedDataLengthBuffer);
172        if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0) {
173          return;
174        }
175      }
176
177      if (unwrappedData == null) {
178        unwrappedDataLengthBuffer.flip();
179        int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
180
181        if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
182          if (RpcServer.LOG.isDebugEnabled()) RpcServer.LOG.debug("Received ping message");
183          unwrappedDataLengthBuffer.clear();
184          continue; // ping message
185        }
186        unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
187      }
188
189      count = this.rpcServer.channelRead(ch, unwrappedData);
190      if (count <= 0 || unwrappedData.remaining() > 0) {
191        return;
192      }
193
194      if (unwrappedData.remaining() == 0) {
195        unwrappedDataLengthBuffer.clear();
196        unwrappedData.flip();
197        processOneRpc(new SingleByteBuff(unwrappedData));
198        unwrappedData = null;
199      }
200    }
201  }
202
203  private void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException {
204    if (saslContextEstablished) {
205      RpcServer.LOG.trace("Read input token of size={} for processing by saslServer.unwrap()",
206        saslToken.limit());
207      if (!useWrap) {
208        processOneRpc(saslToken);
209      } else {
210        byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
211        byte[] plaintextData = saslServer.unwrap(b, 0, b.length);
212        // release the request buffer as we have already unwrapped all its content
213        callCleanupIfNeeded();
214        processUnwrappedData(plaintextData);
215      }
216    } else {
217      byte[] replyToken;
218      try {
219        try {
220          getOrCreateSaslServer();
221        } catch (Exception e) {
222          RpcServer.LOG.error("Error when trying to create instance of HBaseSaslRpcServer "
223            + "with sasl provider: " + provider, e);
224          throw e;
225        }
226        RpcServer.LOG.debug("Created SASL server with mechanism={}",
227          provider.getSaslAuthMethod().getAuthMethod());
228        RpcServer.LOG.debug(
229          "Read input token of size={} for processing by saslServer." + "evaluateResponse()",
230          saslToken.limit());
231        replyToken = saslServer
232          .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
233      } catch (IOException e) {
234        RpcServer.LOG.debug("Failed to execute SASL handshake", e);
235        Throwable sendToClient = HBaseSaslRpcServer.unwrap(e);
236        doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
237          sendToClient.getLocalizedMessage());
238        this.rpcServer.metrics.authenticationFailure();
239        String clientIP = this.toString();
240        // attempting user could be null
241        RpcServer.AUDITLOG.warn("{}{}: {}", RpcServer.AUTH_FAILED_FOR, clientIP,
242          saslServer.getAttemptingUser());
243        throw e;
244      } finally {
245        // release the request buffer as we have already unwrapped all its content
246        callCleanupIfNeeded();
247      }
248      if (replyToken != null) {
249        if (RpcServer.LOG.isDebugEnabled()) {
250          RpcServer.LOG.debug("Will send token of size " + replyToken.length + " from saslServer.");
251        }
252        doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null);
253      }
254      if (saslServer.isComplete()) {
255        finishSaslNegotiation();
256        String qop = saslServer.getNegotiatedQop();
257        useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
258        saslContextEstablished = true;
259      }
260    }
261  }
262
263  /**
264   * Read off the wire. If there is not enough data to read, update the connection state with what
265   * we have and returns.
266   * @return Returns -1 if failure (and caller will close connection), else zero or more.
267   */
268  public int readAndProcess() throws IOException, InterruptedException {
269    // If we have not read the connection setup preamble, look to see if that is on the wire.
270    if (!connectionPreambleRead) {
271      int count = readPreamble();
272      if (!connectionPreambleRead) {
273        return count;
274      }
275    }
276
277    // Try and read in an int. it will be length of the data to read (or -1 if a ping). We catch the
278    // integer length into the 4-byte this.dataLengthBuffer.
279    int count = read4Bytes();
280    if (count < 0 || dataLengthBuffer.remaining() > 0) {
281      return count;
282    }
283
284    // We have read a length and we have read the preamble. It is either the connection header
285    // or it is a request.
286    if (data == null) {
287      dataLengthBuffer.flip();
288      int dataLength = dataLengthBuffer.getInt();
289      if (dataLength == RpcClient.PING_CALL_ID) {
290        if (!useWrap) { // covers the !useSasl too
291          dataLengthBuffer.clear();
292          return 0; // ping message
293        }
294      }
295      if (dataLength < 0) { // A data length of zero is legal.
296        throw new DoNotRetryIOException(
297          "Unexpected data length " + dataLength + "!! from " + getHostAddress());
298      }
299
300      if (dataLength > this.rpcServer.maxRequestSize) {
301        String msg = "RPC data length of " + dataLength + " received from " + getHostAddress()
302          + " is greater than max allowed " + this.rpcServer.maxRequestSize + ". Set \""
303          + SimpleRpcServer.MAX_REQUEST_SIZE
304          + "\" on server to override this limit (not recommended)";
305        SimpleRpcServer.LOG.warn(msg);
306
307        if (connectionHeaderRead && connectionPreambleRead) {
308          incRpcCount();
309          // Construct InputStream for the non-blocking SocketChannel
310          // We need the InputStream because we want to read only the request header
311          // instead of the whole rpc.
312          ByteBuffer buf = ByteBuffer.allocate(1);
313          InputStream is = new InputStream() {
314            @Override
315            public int read() throws IOException {
316              SimpleServerRpcConnection.this.rpcServer.channelRead(channel, buf);
317              buf.flip();
318              int x = buf.get();
319              buf.flip();
320              return x;
321            }
322          };
323          CodedInputStream cis = CodedInputStream.newInstance(is);
324          int headerSize = cis.readRawVarint32();
325          Message.Builder builder = RequestHeader.newBuilder();
326          ProtobufUtil.mergeFrom(builder, cis, headerSize);
327          RequestHeader header = (RequestHeader) builder.build();
328
329          // Notify the client about the offending request
330          SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null,
331            null, null, null, this, 0, this.addr, EnvironmentEdgeManager.currentTime(), 0,
332            this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder);
333          RequestTooBigException reqTooBigEx = new RequestTooBigException(msg);
334          this.rpcServer.metrics.exception(reqTooBigEx);
335          // Make sure the client recognizes the underlying exception
336          // Otherwise, throw a DoNotRetryIOException.
337          if (
338            VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
339              RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)
340          ) {
341            reqTooBig.setResponse(null, null, reqTooBigEx, msg);
342          } else {
343            reqTooBig.setResponse(null, null, new DoNotRetryIOException(msg), msg);
344          }
345          // In most cases we will write out the response directly. If not, it is still OK to just
346          // close the connection without writing out the reqTooBig response. Do not try to write
347          // out directly here, and it will cause deserialization error if the connection is slow
348          // and we have a half writing response in the queue.
349          reqTooBig.sendResponseIfReady();
350        }
351        // Close the connection
352        return -1;
353      }
354
355      // Initialize this.data with a ByteBuff.
356      // This call will allocate a ByteBuff to read request into and assign to this.data
357      // Also when we use some buffer(s) from pool, it will create a CallCleanup instance also and
358      // assign to this.callCleanup
359      initByteBuffToReadInto(dataLength);
360
361      // Increment the rpc count. This counter will be decreased when we write
362      // the response. If we want the connection to be detected as idle properly, we
363      // need to keep the inc / dec correct.
364      incRpcCount();
365    }
366
367    count = channelDataRead(channel, data);
368
369    if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
370      process();
371    }
372
373    return count;
374  }
375
376  // It creates the ByteBuff and CallCleanup and assign to Connection instance.
377  private void initByteBuffToReadInto(int length) {
378    this.data = rpcServer.bbAllocator.allocate(length);
379    this.callCleanup = data::release;
380  }
381
382  protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException {
383    int count = buf.read(channel);
384    if (count > 0) {
385      this.rpcServer.metrics.receivedBytes(count);
386    }
387    return count;
388  }
389
390  /**
391   * Process the data buffer and clean the connection state for the next call.
392   */
393  private void process() throws IOException, InterruptedException {
394    data.rewind();
395    try {
396      if (skipInitialSaslHandshake) {
397        skipInitialSaslHandshake = false;
398        return;
399      }
400
401      if (useSasl) {
402        saslReadAndProcess(data);
403      } else {
404        processOneRpc(data);
405      }
406    } catch (Exception e) {
407      callCleanupIfNeeded();
408      throw e;
409    } finally {
410      dataLengthBuffer.clear(); // Clean for the next call
411      data = null; // For the GC
412      this.callCleanup = null;
413    }
414  }
415
416  @Override
417  public synchronized void close() {
418    disposeSasl();
419    data = null;
420    callCleanupIfNeeded();
421    if (!channel.isOpen()) {
422      return;
423    }
424    try {
425      socket.shutdownOutput();
426    } catch (Exception ignored) {
427      if (SimpleRpcServer.LOG.isTraceEnabled()) {
428        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
429      }
430    }
431    if (channel.isOpen()) {
432      try {
433        channel.close();
434      } catch (Exception ignored) {
435      }
436    }
437    try {
438      socket.close();
439    } catch (Exception ignored) {
440      if (SimpleRpcServer.LOG.isTraceEnabled()) {
441        SimpleRpcServer.LOG.trace("Ignored exception", ignored);
442      }
443    }
444  }
445
446  @Override
447  public boolean isConnectionOpen() {
448    return channel.isOpen();
449  }
450
451  @Override
452  public SimpleServerCall createCall(int id, BlockingService service, MethodDescriptor md,
453    RequestHeader header, Message param, ExtendedCellScanner cellScanner, long size,
454    InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
455    return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size,
456      remoteAddress, EnvironmentEdgeManager.currentTime(), timeout, this.rpcServer.bbAllocator,
457      this.rpcServer.cellBlockBuilder, reqCleanup, this.responder);
458  }
459
460  @Override
461  protected void doRespond(RpcResponse resp) throws IOException {
462    responder.doRespond(this, resp);
463  }
464}