001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
023
024import io.opentelemetry.context.Scope;
025import java.io.BufferedInputStream;
026import java.io.BufferedOutputStream;
027import java.io.DataInputStream;
028import java.io.DataOutputStream;
029import java.io.IOException;
030import java.io.InputStream;
031import java.io.InterruptedIOException;
032import java.io.OutputStream;
033import java.net.InetSocketAddress;
034import java.net.Socket;
035import java.net.SocketTimeoutException;
036import java.security.PrivilegedExceptionAction;
037import java.util.ArrayDeque;
038import java.util.Locale;
039import java.util.Queue;
040import java.util.Set;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.ThreadLocalRandom;
044import java.util.concurrent.atomic.AtomicInteger;
045import javax.security.sasl.SaslException;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.DoNotRetryIOException;
048import org.apache.hadoop.hbase.client.ConnectionUtils;
049import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
050import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
051import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
052import org.apache.hadoop.hbase.log.HBaseMarkers;
053import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
054import org.apache.hadoop.hbase.security.SaslUtil;
055import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
056import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.ExceptionUtil;
059import org.apache.hadoop.io.IOUtils;
060import org.apache.hadoop.ipc.RemoteException;
061import org.apache.hadoop.net.NetUtils;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.util.StringUtils;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
069import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
070import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
071
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
076
077/**
078 * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
079 * remote address. Calls are multiplexed through this socket: responses may be delivered out of
080 * order.
081 */
082@InterfaceAudience.Private
083class BlockingRpcConnection extends RpcConnection implements Runnable {
084
085  private static final Logger LOG = LoggerFactory.getLogger(BlockingRpcConnection.class);
086
087  private final BlockingRpcClient rpcClient;
088
089  private final String threadName;
090  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
091      justification = "We are always under lock actually")
092  private Thread thread;
093
094  // Used for ensuring two reader threads don't run over each other. Should only be used
095  // in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection
096  private final Object readerThreadLock = new Object();
097
098  // Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps.
099  private final AtomicInteger attempts = new AtomicInteger();
100
101  // connected socket. protected for writing UT.
102  protected Socket socket = null;
103  private DataInputStream in;
104  private DataOutputStream out;
105
106  private HBaseSaslRpcClient saslRpcClient;
107
108  // currently active calls
109  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
110
111  private final CallSender callSender;
112
113  private boolean closed = false;
114
115  private byte[] connectionHeaderPreamble;
116
117  private byte[] connectionHeaderWithLength;
118
119  private boolean waitingConnectionHeaderResponse = false;
120
121  /**
122   * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
123   * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
124   * use a different thread for writing. This way, on interruptions, we either cancel the writes or
125   * ignore the answer if the write is already done, but we don't stop the write in the middle. This
126   * adds a thread per region server in the client, so it's kept as an option.
127   * <p>
128   * The implementation is simple: the client threads adds their call to the queue, and then wait
129   * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
130   * interruption, the client cancels its call. The CallSender checks that the call has not been
131   * canceled before writing it.
132   * </p>
133   * When the connection closes, all the calls not yet sent are dismissed. The client thread is
134   * notified with an appropriate exception, as if the call was already sent but the answer not yet
135   * received.
136   * </p>
137   */
138  private class CallSender extends Thread {
139
140    private final Queue<Call> callsToWrite;
141
142    private final int maxQueueSize;
143
144    public CallSender(String name, Configuration conf) {
145      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
146      callsToWrite = new ArrayDeque<>(queueSize);
147      this.maxQueueSize = queueSize;
148      setDaemon(true);
149      setName(name + " - writer");
150    }
151
152    public void sendCall(final Call call) throws IOException {
153      if (callsToWrite.size() >= maxQueueSize) {
154        throw new IOException("Can't add " + call.toShortString()
155          + " to the write queue. callsToWrite.size()=" + callsToWrite.size());
156      }
157      callsToWrite.offer(call);
158      BlockingRpcConnection.this.notifyAll();
159    }
160
161    public void remove(Call call) {
162      callsToWrite.remove(call);
163      // By removing the call from the expected call list, we make the list smaller, but
164      // it means as well that we don't know how many calls we cancelled.
165      calls.remove(call.id);
166      call.setException(new CallCancelledException(call.toShortString() + ", waitTime="
167        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout="
168        + call.timeout));
169    }
170
171    /**
172     * Reads the call from the queue, write them on the socket.
173     */
174    @Override
175    public void run() {
176      synchronized (BlockingRpcConnection.this) {
177        while (!closed) {
178          if (callsToWrite.isEmpty()) {
179            // We should use another monitor object here for better performance since the read
180            // thread also uses ConnectionImpl.this. But this makes the locking schema more
181            // complicated, can do it later as an optimization.
182            try {
183              BlockingRpcConnection.this.wait();
184            } catch (InterruptedException e) {
185              // Restore interrupt status
186              Thread.currentThread().interrupt();
187            }
188            // check if we need to quit, so continue the main loop instead of fallback.
189            continue;
190          }
191          Call call = callsToWrite.poll();
192          if (call.isDone()) {
193            continue;
194          }
195          try (Scope scope = call.span.makeCurrent()) {
196            writeRequest(call);
197          } catch (IOException e) {
198            // exception here means the call has not been added to the pendingCalls yet, so we need
199            // to fail it by our own.
200            LOG.debug("call write error for {}", call.toShortString());
201            call.setException(e);
202            closeConn(e);
203          }
204        }
205      }
206    }
207
208    /**
209     * Cleans the call not yet sent when we finish.
210     */
211    public void cleanup(IOException e) {
212      IOException ie =
213        new ConnectionClosingException("Connection to " + remoteId.getAddress() + " is closing.");
214      for (Call call : callsToWrite) {
215        call.setException(ie);
216      }
217      callsToWrite.clear();
218    }
219  }
220
221  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
222    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
223      rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor,
224      rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes);
225    this.rpcClient = rpcClient;
226    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
227    ConnectionHeader header = getConnectionHeader();
228    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
229    DataOutputStream dos = new DataOutputStream(baos);
230    dos.writeInt(header.getSerializedSize());
231    header.writeTo(dos);
232    assert baos.size() == 4 + header.getSerializedSize();
233    this.connectionHeaderWithLength = baos.getBuffer();
234
235    UserGroupInformation ticket = remoteId.ticket.getUGI();
236    this.threadName = "BRPC Connection (" + this.rpcClient.socketFactory.hashCode() + ") to "
237      + remoteId.getAddress().toString()
238      + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
239
240    if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
241      callSender = new CallSender(threadName, this.rpcClient.conf);
242      callSender.start();
243    } else {
244      callSender = null;
245    }
246  }
247
248  // protected for write UT.
249  protected void setupConnection() throws IOException {
250    short ioFailures = 0;
251    short timeoutFailures = 0;
252    while (true) {
253      try {
254        this.socket = this.rpcClient.socketFactory.createSocket();
255        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
256        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
257        if (this.rpcClient.localAddr != null) {
258          this.socket.bind(this.rpcClient.localAddr);
259        }
260        InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
261        NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
262        this.socket.setSoTimeout(this.rpcClient.readTO);
263        return;
264      } catch (SocketTimeoutException toe) {
265        /*
266         * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
267         */
268        if (LOG.isDebugEnabled()) {
269          LOG.debug(
270            "Received exception in connection setup.\n" + StringUtils.stringifyException(toe));
271        }
272        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
273      } catch (IOException ie) {
274        if (LOG.isDebugEnabled()) {
275          LOG.debug(
276            "Received exception in connection setup.\n" + StringUtils.stringifyException(ie));
277        }
278        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
279      }
280    }
281  }
282
283  /**
284   * Handle connection failures If the current number of retries is equal to the max number of
285   * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
286   * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
287   * the sleep is synchronized; the locks will be retained.
288   * @param curRetries current number of retries
289   * @param maxRetries max number of retries allowed
290   * @param ioe        failure reason
291   * @throws IOException if max number of retries is reached
292   */
293  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
294    throws IOException {
295    closeSocket();
296
297    // throw the exception if the maximum number of retries is reached
298    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
299      throw ioe;
300    }
301
302    // otherwise back off and retry
303    try {
304      Thread.sleep(this.rpcClient.failureSleep);
305    } catch (InterruptedException ie) {
306      ExceptionUtil.rethrowIfInterrupt(ie);
307    }
308
309    if (LOG.isInfoEnabled()) {
310      LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping "
311        + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
312    }
313  }
314
315  /*
316   * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
317   * as to be closed, or the client is marked as not running.
318   * @return true if it is time to read a response; false otherwise.
319   */
320  private synchronized boolean waitForWork() {
321    // beware of the concurrent access to the calls list: we can add calls, but as well
322    // remove them.
323    long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
324    for (;;) {
325      if (thread == null) {
326        return false;
327      }
328
329      // If closeConn is called while we are in the readResponse method, it's possible that a new
330      // call to setupIOStreams comes in and creates a new value for "thread" before readResponse
331      // finishes. Once readResponse finishes, it will come in here and thread will be non-null
332      // above, but pointing at a new thread. In that case, we should end to avoid a situation
333      // where two threads are forever competing for the same socket.
334      if (!isCurrentThreadExpected()) {
335        LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop.");
336        return false;
337      }
338
339      if (!calls.isEmpty()) {
340        return true;
341      }
342      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
343        closeConn(
344          new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
345        return false;
346      }
347      try {
348        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
349      } catch (InterruptedException e) {
350        // Restore interrupt status
351        Thread.currentThread().interrupt();
352
353        String msg = "Interrupted while waiting for work";
354
355        // If we were interrupted by closeConn, it would have set thread to null.
356        // We are synchronized here and if we somehow got interrupted without setting thread to
357        // null, we want to make sure the connection is closed since the read thread would be dead.
358        // Rather than do a null check here, we check if the current thread is the expected thread.
359        // This guards against the case where a call to setupIOStreams got the synchronized lock
360        // first after closeConn, thus changing the thread to a new thread.
361        if (isCurrentThreadExpected()) {
362          LOG.debug(msg + ", closing connection");
363          closeConn(new InterruptedIOException(msg));
364        } else {
365          LOG.debug(msg);
366        }
367
368        return false;
369      }
370    }
371  }
372
373  @Override
374  public void run() {
375    if (LOG.isTraceEnabled()) {
376      LOG.trace("starting");
377    }
378
379    // We have a synchronization here because it's possible in error scenarios for a new
380    // thread to be started while readResponse is still reading on the socket. We don't want
381    // two threads to be reading from the same socket/inputstream.
382    // The below calls can synchronize on "BlockingRpcConnection.this".
383    // We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks
384    synchronized (readerThreadLock) {
385      if (LOG.isTraceEnabled()) {
386        LOG.trace("started");
387      }
388      while (waitForWork()) {
389        readResponse();
390      }
391    }
392    if (LOG.isTraceEnabled()) {
393      LOG.trace("stopped");
394    }
395  }
396
397  private void disposeSasl() {
398    if (saslRpcClient != null) {
399      saslRpcClient.dispose();
400      saslRpcClient = null;
401    }
402  }
403
404  private boolean setupSaslConnection(final InputStream in2, final OutputStream out2,
405    String serverPrincipal) throws IOException {
406    if (this.metrics != null) {
407      this.metrics.incrNsLookups();
408    }
409    saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
410      socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed,
411      this.rpcClient.conf.get("hbase.rpc.protection",
412        QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
413      this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
414    return saslRpcClient.saslConnect(in2, out2);
415  }
416
417  /**
418   * If multiple clients with the same principal try to connect to the same server at the same time,
419   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
420   * work around this, what is done is that the client backs off randomly and tries to initiate the
421   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
422   * attempted.
423   * <p>
424   * The retry logic is governed by the {@link SaslClientAuthenticationProvider#canRetry()} method.
425   * Some providers have the ability to obtain new credentials and then re-attempt to authenticate
426   * with HBase services. Other providers will continue to fail if they failed the first time -- for
427   * those, we want to fail-fast.
428   * </p>
429   */
430  private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
431    final Exception ex, final UserGroupInformation user, final String serverPrincipal)
432    throws IOException, InterruptedException {
433    closeSocket();
434    user.doAs(new PrivilegedExceptionAction<Object>() {
435      @Override
436      public Object run() throws IOException, InterruptedException {
437        // A provider which failed authentication, but doesn't have the ability to relogin with
438        // some external system (e.g. username/password, the password either works or it doesn't)
439        if (!provider.canRetry()) {
440          LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(),
441            ex);
442          if (ex instanceof RemoteException) {
443            throw (RemoteException) ex;
444          }
445          if (ex instanceof SaslException) {
446            String msg = "SASL authentication failed."
447              + " The most likely cause is missing or invalid credentials.";
448            throw new RuntimeException(msg, ex);
449          }
450          throw new IOException(ex);
451        }
452
453        // Other providers, like kerberos, could request a new ticket from a keytab. Let
454        // them try again.
455        if (currRetries < maxRetries) {
456          LOG.debug("Exception encountered while connecting to the server " + remoteId.getAddress(),
457            ex);
458
459          // Invoke the provider to perform the relogin
460          provider.relogin();
461
462          // Get rid of any old state on the SaslClient
463          disposeSasl();
464
465          // have granularity of milliseconds
466          // we are sleeping with the Connection lock held but since this
467          // connection instance is being used for connecting to the server
468          // in question, it is okay
469          Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
470          return null;
471        } else {
472          String msg = "Failed to initiate connection for "
473            + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
474          throw new IOException(msg, ex);
475        }
476      }
477    });
478  }
479
480  private void getConnectionRegistry(InputStream inStream, OutputStream outStream,
481    Call connectionRegistryCall) throws IOException {
482    outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER);
483    readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, remoteExc -> {
484      synchronized (this) {
485        closeConn(remoteExc);
486      }
487    });
488  }
489
490  private void createStreams(InputStream inStream, OutputStream outStream) {
491    this.in = new DataInputStream(new BufferedInputStream(inStream));
492    this.out = new DataOutputStream(new BufferedOutputStream(outStream));
493  }
494
495  // choose the server principal to use
496  private String chooseServerPrincipal(InputStream inStream, OutputStream outStream)
497    throws IOException {
498    Set<String> serverPrincipals = getServerPrincipals();
499    if (serverPrincipals.size() == 1) {
500      return serverPrincipals.iterator().next();
501    }
502    // this means we use kerberos authentication and there are multiple server principal candidates,
503    // in this way we need to send a special preamble header to get server principal from server
504    Call securityPreambleCall = createSecurityPreambleCall(r -> {
505    });
506    outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER);
507    readResponse(new DataInputStream(inStream), calls, securityPreambleCall, remoteExc -> {
508      synchronized (this) {
509        closeConn(remoteExc);
510      }
511    });
512    if (securityPreambleCall.error != null) {
513      LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address,
514        securityPreambleCall.error);
515      if (ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error)) {
516        // this means we are connecting to an old server which does not support the security
517        // preamble call, so we should fallback to randomly select a principal to use
518        // TODO: find a way to reconnect without failing all the pending calls, for now, when we
519        // reach here, shutdown should have already been scheduled
520        throw securityPreambleCall.error;
521      }
522      if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) {
523        // server tells us security is not enabled, then we should check whether fallback to
524        // simple is allowed, if so we just go without security, otherwise we should fail the
525        // negotiation immediately
526        if (rpcClient.fallbackAllowed) {
527          // TODO: just change the preamble and skip the fallback to simple logic, for now, just
528          // select the first principal can finish the connection setup, but waste one client
529          // message
530          return serverPrincipals.iterator().next();
531        } else {
532          throw new FallbackDisallowedException();
533        }
534      }
535      return randomSelect(serverPrincipals);
536    }
537    return chooseServerPrincipal(serverPrincipals, securityPreambleCall);
538  }
539
540  private void setupIOstreams(Call connectionRegistryCall) throws IOException {
541    if (socket != null) {
542      // The connection is already available. Perfect.
543      return;
544    }
545
546    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
547      if (LOG.isDebugEnabled()) {
548        LOG.debug("Not trying to connect to " + remoteId.getAddress()
549          + " this server is in the failed servers list");
550      }
551      throw new FailedServerException(
552        "This server is in the failed servers list: " + remoteId.getAddress());
553    }
554
555    try {
556      if (LOG.isDebugEnabled()) {
557        LOG.debug("Connecting to " + remoteId.getAddress());
558      }
559
560      short numRetries = 0;
561      int reloginMaxRetries = this.rpcClient.conf.getInt("hbase.security.relogin.maxretries", 5);
562      while (true) {
563        setupConnection();
564        InputStream inStream = NetUtils.getInputStream(socket);
565        // This creates a socket with a write timeout. This timeout cannot be changed.
566        OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
567        if (connectionRegistryCall != null) {
568          getConnectionRegistry(inStream, outStream, connectionRegistryCall);
569          closeSocket();
570          return;
571        }
572
573        if (useSasl) {
574          UserGroupInformation ticket = provider.getRealUser(remoteId.ticket);
575          boolean continueSasl;
576          if (ticket == null) {
577            throw new FatalConnectionException("ticket/user is null");
578          }
579          String serverPrincipal = chooseServerPrincipal(inStream, outStream);
580          // Write out the preamble -- MAGIC, version, and auth to use.
581          writeConnectionHeaderPreamble(outStream);
582          try {
583            final InputStream in2 = inStream;
584            final OutputStream out2 = outStream;
585            continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
586              @Override
587              public Boolean run() throws IOException {
588                return setupSaslConnection(in2, out2, serverPrincipal);
589              }
590            });
591          } catch (Exception ex) {
592            ExceptionUtil.rethrowIfInterrupt(ex);
593            saslNegotiationDone(serverPrincipal, false);
594            handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket,
595              serverPrincipal);
596            continue;
597          }
598          saslNegotiationDone(serverPrincipal, true);
599          if (continueSasl) {
600            // Sasl connect is successful. Let's set up Sasl i/o streams.
601            inStream = saslRpcClient.getInputStream();
602            outStream = saslRpcClient.getOutputStream();
603          } else {
604            // fall back to simple auth because server told us so.
605            // do not change authMethod and useSasl here, we should start from secure when
606            // reconnecting because regionserver may change its sasl config after restart.
607            saslRpcClient = null;
608          }
609        } else {
610          // Write out the preamble -- MAGIC, version, and auth to use.
611          writeConnectionHeaderPreamble(outStream);
612        }
613        createStreams(inStream, outStream);
614        // Now write out the connection header
615        writeConnectionHeader();
616        // process the response from server for connection header if necessary
617        processResponseForConnectionHeader();
618        break;
619      }
620    } catch (Throwable t) {
621      closeSocket();
622      IOException e = ExceptionUtil.asInterrupt(t);
623      if (e == null) {
624        this.rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), t);
625        if (t instanceof LinkageError) {
626          // probably the hbase hadoop version does not match the running hadoop version
627          e = new DoNotRetryIOException(t);
628        } else if (t instanceof IOException) {
629          e = (IOException) t;
630        } else {
631          e = new IOException("Could not set up IO Streams to " + remoteId.getAddress(), t);
632        }
633      }
634      throw e;
635    }
636
637    // start the receiver thread after the socket connection has been set up
638    thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")");
639    thread.setDaemon(true);
640    thread.start();
641  }
642
643  /**
644   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
645   */
646  private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
647    out.write(connectionHeaderPreamble);
648    out.flush();
649  }
650
651  /**
652   * Write the connection header.
653   */
654  private void writeConnectionHeader() throws IOException {
655    boolean isCryptoAesEnable = false;
656    // check if Crypto AES is enabled
657    if (saslRpcClient != null) {
658      boolean saslEncryptionEnabled = SaslUtil.QualityOfProtection.PRIVACY.getSaslQop()
659        .equalsIgnoreCase(saslRpcClient.getSaslQOP());
660      isCryptoAesEnable = saslEncryptionEnabled
661        && conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT);
662    }
663
664    // if Crypto AES is enabled, set transformation and negotiate with server
665    if (isCryptoAesEnable) {
666      waitingConnectionHeaderResponse = true;
667    }
668    this.out.write(connectionHeaderWithLength);
669    this.out.flush();
670  }
671
672  private void processResponseForConnectionHeader() throws IOException {
673    // if no response excepted, return
674    if (!waitingConnectionHeaderResponse) return;
675    try {
676      // read the ConnectionHeaderResponse from server
677      int len = this.in.readInt();
678      byte[] buff = new byte[len];
679      int readSize = this.in.read(buff);
680      if (LOG.isDebugEnabled()) {
681        LOG.debug("Length of response for connection header:" + readSize);
682      }
683
684      RPCProtos.ConnectionHeaderResponse connectionHeaderResponse =
685        RPCProtos.ConnectionHeaderResponse.parseFrom(buff);
686
687      // Get the CryptoCipherMeta, update the HBaseSaslRpcClient for Crypto Cipher
688      if (connectionHeaderResponse.hasCryptoCipherMeta()) {
689        negotiateCryptoAes(connectionHeaderResponse.getCryptoCipherMeta());
690      }
691      waitingConnectionHeaderResponse = false;
692    } catch (SocketTimeoutException ste) {
693      LOG.error(HBaseMarkers.FATAL,
694        "Can't get the connection header response for rpc timeout, "
695          + "please check if server has the correct configuration to support the additional "
696          + "function.",
697        ste);
698      // timeout when waiting the connection header response, ignore the additional function
699      throw new IOException("Timeout while waiting connection header response", ste);
700    }
701  }
702
703  private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta) throws IOException {
704    // initialize the Crypto AES with CryptoCipherMeta
705    saslRpcClient.initCryptoCipher(cryptoCipherMeta, this.rpcClient.conf);
706    // reset the inputStream/outputStream for Crypto AES encryption
707    this.in = new DataInputStream(new BufferedInputStream(saslRpcClient.getInputStream()));
708    this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
709  }
710
711  /**
712   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
713   * the Connection thread, but by other threads.
714   * @see #readResponse()
715   */
716  private void writeRequest(Call call) throws IOException {
717    ByteBuf cellBlock = null;
718    try {
719      cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor,
720        call.cells, PooledByteBufAllocator.DEFAULT);
721      CellBlockMeta cellBlockMeta;
722      if (cellBlock != null) {
723        cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.readableBytes()).build();
724      } else {
725        cellBlockMeta = null;
726      }
727      RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
728      if (call.isConnectionRegistryCall()) {
729        setupIOstreams(call);
730        return;
731      }
732      setupIOstreams(null);
733
734      // Now we're going to write the call. We take the lock, then check that the connection
735      // is still valid, and, if so we do the write to the socket. If the write fails, we don't
736      // know where we stand, we have to close the connection.
737      if (Thread.interrupted()) {
738        throw new InterruptedIOException();
739      }
740
741      calls.put(call.id, call); // We put first as we don't want the connection to become idle.
742      // from here, we do not throw any exception to upper layer as the call has been tracked in
743      // the pending calls map.
744      try {
745        call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
746      } catch (Throwable t) {
747        if (LOG.isTraceEnabled()) {
748          LOG.trace("Error while writing {}", call.toShortString(), t);
749        }
750        IOException e = IPCUtil.toIOE(t);
751        closeConn(e);
752        return;
753      }
754    } finally {
755      if (cellBlock != null) {
756        cellBlock.release();
757      }
758    }
759    notifyAll();
760  }
761
762  /*
763   * Receive a response. Because only one receiver, so no synchronization on in.
764   */
765  private void readResponse() {
766    try {
767      readResponse(in, calls, null, remoteExc -> {
768        synchronized (this) {
769          closeConn(remoteExc);
770        }
771      });
772    } catch (IOException e) {
773      if (e instanceof SocketTimeoutException) {
774        // Clean up open calls but don't treat this as a fatal condition,
775        // since we expect certain responses to not make it by the specified
776        // {@link ConnectionId#rpcTimeout}.
777        if (LOG.isTraceEnabled()) {
778          LOG.trace("ignored", e);
779        }
780      } else {
781        synchronized (this) {
782          // The exception we received may have been caused by another thread closing
783          // this connection. It's possible that before getting to this point, a new connection was
784          // created. In that case, it doesn't help and can actually hurt to close again here.
785          if (isCurrentThreadExpected()) {
786            LOG.debug("Closing connection after error", e);
787            closeConn(e);
788          }
789        }
790      }
791    }
792  }
793
794  /**
795   * For use in the reader thread, tests if the current reader thread is the one expected to be
796   * running. When closeConn is called, the reader thread is expected to end. setupIOStreams then
797   * creates a new thread and updates the thread pointer. At that point, the new thread should be
798   * the only one running. We use this method to guard against cases where the old thread may be
799   * erroneously running or closing the connection in error states.
800   */
801  private boolean isCurrentThreadExpected() {
802    return thread == Thread.currentThread();
803  }
804
805  @Override
806  protected synchronized void callTimeout(Call call) {
807    // call sender
808    calls.remove(call.id);
809  }
810
811  // just close socket input and output.
812  private void closeSocket() {
813    IOUtils.closeStream(out);
814    IOUtils.closeStream(in);
815    IOUtils.closeSocket(socket);
816    out = null;
817    in = null;
818    socket = null;
819  }
820
821  // close socket, reader, and clean up all pending calls.
822  private void closeConn(IOException e) {
823    if (thread == null) {
824      return;
825    }
826    thread.interrupt();
827    thread = null;
828    closeSocket();
829    if (callSender != null) {
830      callSender.cleanup(e);
831    }
832    for (Call call : calls.values()) {
833      call.setException(e);
834    }
835    calls.clear();
836  }
837
838  // release all resources, the connection will not be used any more.
839  @Override
840  public synchronized void shutdown() {
841    closed = true;
842    if (callSender != null) {
843      callSender.interrupt();
844    }
845    closeConn(new IOException("connection to " + remoteId.getAddress() + " closed"));
846  }
847
848  @Override
849  public void cleanupConnection() {
850    // do nothing
851  }
852
853  @Override
854  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
855    throws IOException {
856    pcrc.notifyOnCancel(new RpcCallback<Object>() {
857
858      @Override
859      public void run(Object parameter) {
860        setCancelled(call);
861        synchronized (BlockingRpcConnection.this) {
862          if (callSender != null) {
863            callSender.remove(call);
864          } else {
865            calls.remove(call.id);
866          }
867        }
868      }
869    }, new CancellationCallback() {
870
871      @Override
872      public void run(boolean cancelled) throws IOException {
873        if (cancelled) {
874          setCancelled(call);
875          return;
876        }
877        scheduleTimeoutTask(call);
878        if (callSender != null) {
879          callSender.sendCall(call);
880        } else {
881          // this is in the same thread with the caller so do not need to attach the trace context
882          // again.
883          writeRequest(call);
884        }
885      }
886    });
887  }
888
889  @Override
890  public synchronized boolean isActive() {
891    return thread != null;
892  }
893}