001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.DataInput; 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024import java.net.InetAddress; 025import java.net.InetSocketAddress; 026import java.net.UnknownHostException; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Map; 030import java.util.Set; 031import java.util.TreeSet; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.TimeUnit; 034import java.util.function.Consumer; 035import javax.security.sasl.SaslException; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.CellScanner; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.client.MetricsConnection; 040import org.apache.hadoop.hbase.codec.Codec; 041import org.apache.hadoop.hbase.net.Address; 042import org.apache.hadoop.hbase.security.AuthMethod; 043import org.apache.hadoop.hbase.security.SecurityConstants; 044import org.apache.hadoop.hbase.security.SecurityInfo; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; 047import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.hadoop.io.compress.CompressionCodec; 051import org.apache.hadoop.ipc.RemoteException; 052import org.apache.hadoop.security.SecurityUtil; 053import org.apache.hadoop.security.token.Token; 054import org.apache.hadoop.security.token.TokenIdentifier; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.protobuf.Message; 060import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 061import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 062import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 064import org.apache.hbase.thirdparty.io.netty.util.Timeout; 065import org.apache.hbase.thirdparty.io.netty.util.TimerTask; 066 067import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 074 075/** 076 * Base class for ipc connection. 077 */ 078@InterfaceAudience.Private 079abstract class RpcConnection { 080 081 private static final Logger LOG = LoggerFactory.getLogger(RpcConnection.class); 082 083 protected final ConnectionId remoteId; 084 085 protected final boolean useSasl; 086 087 protected final Token<? extends TokenIdentifier> token; 088 089 protected final SecurityInfo securityInfo; 090 091 protected final int reloginMaxBackoff; // max pause before relogin on sasl failure 092 093 protected final Codec codec; 094 095 protected final CompressionCodec compressor; 096 097 protected final CellBlockBuilder cellBlockBuilder; 098 099 protected final MetricsConnection metrics; 100 private final Map<String, byte[]> connectionAttributes; 101 102 protected final HashedWheelTimer timeoutTimer; 103 104 protected final Configuration conf; 105 106 protected static String CRYPTO_AES_ENABLED_KEY = "hbase.rpc.crypto.encryption.aes.enabled"; 107 108 protected static boolean CRYPTO_AES_ENABLED_DEFAULT = false; 109 110 // the last time we were picked up from connection pool. 111 protected long lastTouched; 112 113 protected SaslClientAuthenticationProvider provider; 114 115 // Record the server principal which we have successfully authenticated with the remote server 116 // this is used to save the extra round trip with server when there are multiple candidate server 117 // principals for a given rpc service, like ClientMetaService. 118 // See HBASE-28321 for more details. 119 private String lastSucceededServerPrincipal; 120 121 protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, 122 String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, 123 CellBlockBuilder cellBlockBuilder, MetricsConnection metrics, 124 Map<String, byte[]> connectionAttributes) throws IOException { 125 this.timeoutTimer = timeoutTimer; 126 this.codec = codec; 127 this.compressor = compressor; 128 this.cellBlockBuilder = cellBlockBuilder; 129 this.conf = conf; 130 this.metrics = metrics; 131 this.connectionAttributes = connectionAttributes; 132 User ticket = remoteId.getTicket(); 133 this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName()); 134 this.useSasl = isSecurityEnabled; 135 136 // Choose the correct Token and AuthenticationProvider for this client to use 137 SaslClientAuthenticationProviders providers = 138 SaslClientAuthenticationProviders.getInstance(conf); 139 Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair; 140 if (useSasl && securityInfo != null) { 141 pair = providers.selectProvider(clusterId, ticket); 142 if (pair == null) { 143 if (LOG.isTraceEnabled()) { 144 LOG.trace("Found no valid authentication method from providers={} with tokens={}", 145 providers.toString(), ticket.getTokens()); 146 } 147 throw new RuntimeException("Found no valid authentication method from options"); 148 } 149 } else if (!useSasl) { 150 // Hack, while SIMPLE doesn't go via SASL. 151 pair = providers.getSimpleProvider(); 152 } else { 153 throw new RuntimeException("Could not compute valid client authentication provider"); 154 } 155 156 this.provider = pair.getFirst(); 157 this.token = pair.getSecond(); 158 159 LOG.debug("Using {} authentication for service={}, sasl={}", 160 provider.getSaslAuthMethod().getName(), remoteId.serviceName, useSasl); 161 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000); 162 this.remoteId = remoteId; 163 } 164 165 protected final void scheduleTimeoutTask(final Call call) { 166 if (call.timeout > 0) { 167 call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() { 168 169 @Override 170 public void run(Timeout timeout) throws Exception { 171 call.setTimeout(new CallTimeoutException(call.toShortString() + ", waitTime=" 172 + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + "ms, rpcTimeout=" 173 + call.timeout + "ms")); 174 callTimeout(call); 175 } 176 }, call.timeout, TimeUnit.MILLISECONDS); 177 } 178 } 179 180 // will be overridden in tests 181 protected byte[] getConnectionHeaderPreamble() { 182 // Assemble the preamble up in a buffer first and then send it. Writing individual elements, 183 // they are getting sent across piecemeal according to wireshark and then server is messing 184 // up the reading on occasion (the passed in stream is not buffered yet). 185 int rpcHeaderLen = HConstants.RPC_HEADER.length; 186 // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE 187 byte[] preamble = new byte[rpcHeaderLen + 2]; 188 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen); 189 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; 190 synchronized (this) { 191 preamble[preamble.length - 1] = provider.getSaslAuthMethod().getCode(); 192 } 193 return preamble; 194 } 195 196 protected final ConnectionHeader getConnectionHeader() { 197 final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); 198 builder.setServiceName(remoteId.getServiceName()); 199 final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket); 200 if (userInfoPB != null) { 201 builder.setUserInfo(userInfoPB); 202 } 203 if (this.codec != null) { 204 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName()); 205 } 206 if (this.compressor != null) { 207 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName()); 208 } 209 if (connectionAttributes != null && !connectionAttributes.isEmpty()) { 210 HBaseProtos.NameBytesPair.Builder attributeBuilder = HBaseProtos.NameBytesPair.newBuilder(); 211 for (Map.Entry<String, byte[]> attribute : connectionAttributes.entrySet()) { 212 attributeBuilder.setName(attribute.getKey()); 213 attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue())); 214 builder.addAttribute(attributeBuilder.build()); 215 } 216 } 217 builder.setVersionInfo(ProtobufUtil.getVersionInfo()); 218 boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT); 219 // if Crypto AES enable, setup Cipher transformation 220 if (isCryptoAESEnable) { 221 builder.setRpcCryptoCipherTransformation( 222 conf.get("hbase.rpc.crypto.encryption.aes.cipher.transform", "AES/CTR/NoPadding")); 223 } 224 return builder.build(); 225 } 226 227 protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics) 228 throws UnknownHostException { 229 if (metrics != null) { 230 metrics.incrNsLookups(); 231 } 232 InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); 233 if (remoteAddr.isUnresolved()) { 234 if (metrics != null) { 235 metrics.incrNsLookupsFailed(); 236 } 237 throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); 238 } 239 return remoteAddr; 240 } 241 242 private static boolean useCanonicalHostname(Configuration conf) { 243 return !conf.getBoolean( 244 SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, 245 SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS); 246 } 247 248 private static String getHostnameForServerPrincipal(Configuration conf, InetAddress addr) { 249 final String hostname; 250 if (useCanonicalHostname(conf)) { 251 hostname = addr.getCanonicalHostName(); 252 if (hostname.equals(addr.getHostAddress())) { 253 LOG.warn("Canonical hostname for SASL principal is the same with IP address: " + hostname 254 + ", " + addr.getHostName() + ". Check DNS configuration or consider " 255 + SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + "=true"); 256 } 257 } else { 258 hostname = addr.getHostName(); 259 } 260 261 return hostname.toLowerCase(); 262 } 263 264 private static String getServerPrincipal(Configuration conf, String serverKey, InetAddress server) 265 throws IOException { 266 String hostname = getHostnameForServerPrincipal(conf, server); 267 return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname); 268 } 269 270 protected final boolean isKerberosAuth() { 271 return provider.getSaslAuthMethod().getCode() == AuthMethod.KERBEROS.code; 272 } 273 274 protected final Set<String> getServerPrincipals() throws IOException { 275 // for authentication method other than kerberos, we do not need to know the server principal 276 if (!isKerberosAuth()) { 277 return Collections.singleton(HConstants.EMPTY_STRING); 278 } 279 // if we have successfully authenticated last time, just return the server principal we use last 280 // time 281 if (lastSucceededServerPrincipal != null) { 282 return Collections.singleton(lastSucceededServerPrincipal); 283 } 284 InetAddress server = 285 new InetSocketAddress(remoteId.address.getHostName(), remoteId.address.getPort()) 286 .getAddress(); 287 // Even if we have multiple config key in security info, it is still possible that we configured 288 // the same principal for them, so here we use a Set 289 Set<String> serverPrincipals = new TreeSet<>(); 290 for (String serverPrincipalKey : securityInfo.getServerPrincipals()) { 291 serverPrincipals.add(getServerPrincipal(conf, serverPrincipalKey, server)); 292 } 293 return serverPrincipals; 294 } 295 296 protected final <T> T randomSelect(Collection<T> c) { 297 int select = ThreadLocalRandom.current().nextInt(c.size()); 298 int index = 0; 299 for (T t : c) { 300 if (index == select) { 301 return t; 302 } 303 index++; 304 } 305 return null; 306 } 307 308 protected final String chooseServerPrincipal(Set<String> candidates, Call securityPreambleCall) 309 throws SaslException { 310 String principal = 311 ((SecurityPreamableResponse) securityPreambleCall.response).getServerPrincipal(); 312 if (!candidates.contains(principal)) { 313 // this means the server returns principal which is not in our candidates, it could be a 314 // malicious server, stop connecting 315 throw new SaslException(remoteId.address + " tells us to use server principal " + principal 316 + " which is not expected, should be one of " + candidates); 317 } 318 return principal; 319 } 320 321 protected final void saslNegotiationDone(String serverPrincipal, boolean succeed) { 322 LOG.debug("sasl negotiation done with serverPrincipal = {}, succeed = {}", serverPrincipal, 323 succeed); 324 if (succeed) { 325 this.lastSucceededServerPrincipal = serverPrincipal; 326 } else { 327 // clear the recorded principal if authentication failed 328 this.lastSucceededServerPrincipal = null; 329 } 330 } 331 332 protected abstract void callTimeout(Call call); 333 334 public ConnectionId remoteId() { 335 return remoteId; 336 } 337 338 public long getLastTouched() { 339 return lastTouched; 340 } 341 342 public void setLastTouched(long lastTouched) { 343 this.lastTouched = lastTouched; 344 } 345 346 /** 347 * Tell the idle connection sweeper whether we could be swept. 348 */ 349 public abstract boolean isActive(); 350 351 /** 352 * Just close connection. Do not need to remove from connection pool. 353 */ 354 public abstract void shutdown(); 355 356 public abstract void sendRequest(Call call, HBaseRpcController hrc) throws IOException; 357 358 /** 359 * Does the clean up work after the connection is removed from the connection pool 360 */ 361 public abstract void cleanupConnection(); 362 363 protected final Call createSecurityPreambleCall(RpcCallback<Call> callback) { 364 return new Call(-1, null, null, null, SecurityPreamableResponse.getDefaultInstance(), 0, 0, 365 Collections.emptyMap(), callback, MetricsConnection.newCallStats()); 366 } 367 368 private <T extends InputStream & DataInput> void finishCall(ResponseHeader responseHeader, T in, 369 Call call) throws IOException { 370 Message value; 371 if (call.responseDefaultType != null) { 372 Message.Builder builder = call.responseDefaultType.newBuilderForType(); 373 if (!builder.mergeDelimitedFrom(in)) { 374 // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF 375 // before reading any bytes out, so here we need to manually finish create the EOFException 376 // and finish the call 377 call.setException(new EOFException("EOF while reading response with type: " 378 + call.responseDefaultType.getClass().getName())); 379 return; 380 } 381 value = builder.build(); 382 } else { 383 value = null; 384 } 385 CellScanner cellBlockScanner; 386 if (responseHeader.hasCellBlockMeta()) { 387 int size = responseHeader.getCellBlockMeta().getLength(); 388 // Maybe we could read directly from the ByteBuf. 389 // The problem here is that we do not know when to release it. 390 byte[] cellBlock = new byte[size]; 391 in.readFully(cellBlock); 392 cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); 393 } else { 394 cellBlockScanner = null; 395 } 396 call.setResponse(value, cellBlockScanner); 397 } 398 399 <T extends InputStream & DataInput> void readResponse(T in, Map<Integer, Call> id2Call, 400 Call preambleCall, Consumer<RemoteException> fatalConnectionErrorConsumer) throws IOException { 401 int totalSize = in.readInt(); 402 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); 403 int id = responseHeader.getCallId(); 404 if (LOG.isTraceEnabled()) { 405 LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) 406 + ", totalSize: " + totalSize + " bytes"); 407 } 408 RemoteException remoteExc; 409 if (responseHeader.hasException()) { 410 ExceptionResponse exceptionResponse = responseHeader.getException(); 411 remoteExc = IPCUtil.createRemoteException(exceptionResponse); 412 if (IPCUtil.isFatalConnectionException(exceptionResponse)) { 413 // Here we will cleanup all calls so do not need to fall back, just return. 414 fatalConnectionErrorConsumer.accept(remoteExc); 415 if (preambleCall != null) { 416 preambleCall.setException(remoteExc); 417 } 418 return; 419 } 420 } else { 421 remoteExc = null; 422 } 423 if (id < 0) { 424 LOG.debug("process preamble call response with response type {}", 425 preambleCall != null 426 ? preambleCall.responseDefaultType.getDescriptorForType().getName() 427 : "null"); 428 if (preambleCall == null) { 429 // fall through so later we will skip this response 430 LOG.warn("Got a negative call id {} but there is no preamble call", id); 431 } else { 432 if (remoteExc != null) { 433 preambleCall.setException(remoteExc); 434 } else { 435 finishCall(responseHeader, in, preambleCall); 436 } 437 return; 438 } 439 } 440 Call call = id2Call.remove(id); 441 if (call == null) { 442 // So we got a response for which we have no corresponding 'call' here on the client-side. 443 // We probably timed out waiting, cleaned up all references, and now the server decides 444 // to return a response. There is nothing we can do w/ the response at this stage. Clean 445 // out the wire of the response so its out of the way and we can get other responses on 446 // this connection. 447 if (LOG.isDebugEnabled()) { 448 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); 449 int whatIsLeftToRead = totalSize - readSoFar; 450 LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead 451 + " bytes"); 452 } 453 return; 454 } 455 call.callStats.setResponseSizeBytes(totalSize); 456 if (remoteExc != null) { 457 call.setException(remoteExc); 458 return; 459 } 460 try { 461 finishCall(responseHeader, in, call); 462 } catch (IOException e) { 463 // As the call has been removed from id2Call map, if we hit an exception here, the 464 // exceptionCaught method can not help us finish the call, so here we need to catch the 465 // exception and finish it 466 call.setException(e); 467 // throw the exception out, the upper layer should determine whether this is a critical 468 // problem 469 throw e; 470 } 471 } 472}