001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.asyncfs; 019 020import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite; 021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; 022import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; 023 024import java.io.IOException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.lang.reflect.Method; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.nio.ByteBuffer; 032import java.security.GeneralSecurityException; 033import java.util.Arrays; 034import java.util.Base64; 035import java.util.Collections; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicBoolean; 041import javax.security.auth.callback.Callback; 042import javax.security.auth.callback.CallbackHandler; 043import javax.security.auth.callback.NameCallback; 044import javax.security.auth.callback.PasswordCallback; 045import javax.security.auth.callback.UnsupportedCallbackException; 046import javax.security.sasl.RealmCallback; 047import javax.security.sasl.RealmChoiceCallback; 048import javax.security.sasl.Sasl; 049import javax.security.sasl.SaslClient; 050import javax.security.sasl.SaslException; 051import org.apache.commons.lang3.StringUtils; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.crypto.CipherOption; 054import org.apache.hadoop.crypto.CipherSuite; 055import org.apache.hadoop.crypto.CryptoCodec; 056import org.apache.hadoop.crypto.Decryptor; 057import org.apache.hadoop.crypto.Encryptor; 058import org.apache.hadoop.crypto.key.KeyProvider; 059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; 060import org.apache.hadoop.fs.FileEncryptionInfo; 061import org.apache.hadoop.hdfs.DFSClient; 062import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; 066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; 067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; 068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; 069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; 070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; 072import org.apache.hadoop.security.SaslPropertiesResolver; 073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; 074import org.apache.hadoop.security.UserGroupInformation; 075import org.apache.hadoop.security.token.Token; 076import org.apache.yetus.audience.InterfaceAudience; 077import org.slf4j.Logger; 078import org.slf4j.LoggerFactory; 079 080import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; 082import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 083import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; 084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; 085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; 086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; 087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; 088import org.apache.hbase.thirdparty.io.netty.channel.Channel; 089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; 090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; 091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter; 092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; 093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; 094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; 095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; 096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder; 097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; 099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; 100import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; 101 102/** 103 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. 104 */ 105@InterfaceAudience.Private 106public final class FanOutOneBlockAsyncDFSOutputSaslHelper { 107 private static final Logger LOG = 108 LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class); 109 110 private FanOutOneBlockAsyncDFSOutputSaslHelper() { 111 } 112 113 private static final String SERVER_NAME = "0"; 114 private static final String PROTOCOL = "hdfs"; 115 private static final String MECHANISM = 116 org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN.getMechanismName(); 117 private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; 118 private static final String NAME_DELIMITER = " "; 119 120 private interface SaslAdaptor { 121 122 TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient); 123 124 SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient); 125 126 AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient); 127 } 128 129 private static final SaslAdaptor SASL_ADAPTOR; 130 131 private interface TransparentCryptoHelper { 132 133 Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client) 134 throws IOException; 135 } 136 137 private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER; 138 139 private static SaslAdaptor createSaslAdaptor() 140 throws NoSuchFieldException, NoSuchMethodException { 141 Field saslPropsResolverField = 142 SaslDataTransferClient.class.getDeclaredField("saslPropsResolver"); 143 saslPropsResolverField.setAccessible(true); 144 Field trustedChannelResolverField = 145 SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver"); 146 trustedChannelResolverField.setAccessible(true); 147 Field fallbackToSimpleAuthField = 148 SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth"); 149 fallbackToSimpleAuthField.setAccessible(true); 150 return new SaslAdaptor() { 151 152 @Override 153 public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) { 154 try { 155 return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient); 156 } catch (IllegalAccessException e) { 157 throw new RuntimeException(e); 158 } 159 } 160 161 @Override 162 public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) { 163 try { 164 return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient); 165 } catch (IllegalAccessException e) { 166 throw new RuntimeException(e); 167 } 168 } 169 170 @Override 171 public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) { 172 try { 173 return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient); 174 } catch (IllegalAccessException e) { 175 throw new RuntimeException(e); 176 } 177 } 178 }; 179 } 180 181 private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396() 182 throws NoSuchMethodException { 183 Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class 184 .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class); 185 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 186 return new TransparentCryptoHelper() { 187 188 @Override 189 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 190 DFSClient client) throws IOException { 191 try { 192 KeyVersion decryptedKey = 193 (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo); 194 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 195 Encryptor encryptor = cryptoCodec.createEncryptor(); 196 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 197 return encryptor; 198 } catch (InvocationTargetException e) { 199 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 200 throw new RuntimeException(e.getTargetException()); 201 } catch (GeneralSecurityException e) { 202 throw new IOException(e); 203 } catch (IllegalAccessException e) { 204 throw new RuntimeException(e); 205 } 206 } 207 }; 208 } 209 210 private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396() 211 throws ClassNotFoundException, NoSuchMethodException { 212 Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil"); 213 Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod( 214 "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class); 215 decryptEncryptedDataEncryptionKeyMethod.setAccessible(true); 216 return new TransparentCryptoHelper() { 217 218 @Override 219 public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, 220 DFSClient client) throws IOException { 221 try { 222 KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod 223 .invoke(null, feInfo, client.getKeyProvider()); 224 CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite()); 225 Encryptor encryptor = cryptoCodec.createEncryptor(); 226 encryptor.init(decryptedKey.getMaterial(), feInfo.getIV()); 227 return encryptor; 228 } catch (InvocationTargetException e) { 229 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 230 throw new RuntimeException(e.getTargetException()); 231 } catch (GeneralSecurityException e) { 232 throw new IOException(e); 233 } catch (IllegalAccessException e) { 234 throw new RuntimeException(e); 235 } 236 } 237 }; 238 } 239 240 private static TransparentCryptoHelper createTransparentCryptoHelper() 241 throws NoSuchMethodException, ClassNotFoundException { 242 try { 243 return createTransparentCryptoHelperWithoutHDFS12396(); 244 } catch (NoSuchMethodException e) { 245 LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," 246 + " should be hadoop version with HDFS-12396", e); 247 } 248 return createTransparentCryptoHelperWithHDFS12396(); 249 } 250 251 static { 252 try { 253 SASL_ADAPTOR = createSaslAdaptor(); 254 TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); 255 } catch (Exception e) { 256 String msg = "Couldn't properly initialize access to HDFS internals. Please " 257 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " 258 + "HBASE-16110 for more information."; 259 LOG.error(msg, e); 260 throw new Error(msg, e); 261 } 262 } 263 264 /** 265 * Sets user name and password when asked by the client-side SASL object. 266 */ 267 private static final class SaslClientCallbackHandler implements CallbackHandler { 268 269 private final char[] password; 270 private final String userName; 271 272 /** 273 * Creates a new SaslClientCallbackHandler. 274 * @param userName SASL user name 275 * @param password SASL password 276 */ 277 public SaslClientCallbackHandler(String userName, char[] password) { 278 this.password = password; 279 this.userName = userName; 280 } 281 282 @Override 283 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { 284 NameCallback nc = null; 285 PasswordCallback pc = null; 286 RealmCallback rc = null; 287 for (Callback callback : callbacks) { 288 if (callback instanceof RealmChoiceCallback) { 289 continue; 290 } else if (callback instanceof NameCallback) { 291 nc = (NameCallback) callback; 292 } else if (callback instanceof PasswordCallback) { 293 pc = (PasswordCallback) callback; 294 } else if (callback instanceof RealmCallback) { 295 rc = (RealmCallback) callback; 296 } else { 297 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 298 } 299 } 300 if (nc != null) { 301 nc.setName(userName); 302 } 303 if (pc != null) { 304 pc.setPassword(password); 305 } 306 if (rc != null) { 307 rc.setText(rc.getDefaultText()); 308 } 309 } 310 } 311 312 private static final class SaslNegotiateHandler extends ChannelDuplexHandler { 313 314 private final Configuration conf; 315 316 private final Map<String, String> saslProps; 317 318 private final SaslClient saslClient; 319 320 private final int timeoutMs; 321 322 private final Promise<Void> promise; 323 324 private final DFSClient dfsClient; 325 326 private int step = 0; 327 328 public SaslNegotiateHandler(Configuration conf, String username, char[] password, 329 Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, DFSClient dfsClient) 330 throws SaslException { 331 this.conf = conf; 332 this.saslProps = saslProps; 333 this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, 334 SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password)); 335 this.timeoutMs = timeoutMs; 336 this.promise = promise; 337 this.dfsClient = dfsClient; 338 } 339 340 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { 341 sendSaslMessage(ctx, payload, null); 342 } 343 344 private List<CipherOption> getCipherOptions() throws IOException { 345 // Negotiate cipher suites if configured. Currently, the only supported 346 // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple 347 // values for future expansion. 348 String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); 349 if (StringUtils.isBlank(cipherSuites)) { 350 return null; 351 } 352 if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { 353 throw new IOException(String.format("Invalid cipher suite, %s=%s", 354 DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); 355 } 356 return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING)); 357 } 358 359 /** 360 * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. After 361 * Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*. 362 * Use Reflection to check which ones to use. 363 */ 364 private static class BuilderPayloadSetter { 365 private static Method setPayloadMethod; 366 private static Constructor<?> constructor; 367 368 /** 369 * Create a ByteString from byte array without copying (wrap), and then set it as the payload 370 * for the builder. 371 * @param builder builder for HDFS DataTransferEncryptorMessage. 372 * @param payload byte array of payload. 373 */ 374 static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder, 375 byte[] payload) throws IOException { 376 Object byteStringObject; 377 try { 378 // byteStringObject = new LiteralByteString(payload); 379 byteStringObject = constructor.newInstance(payload); 380 // builder.setPayload(byteStringObject); 381 setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject)); 382 } catch (IllegalAccessException | InstantiationException e) { 383 throw new RuntimeException(e); 384 385 } catch (InvocationTargetException e) { 386 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); 387 throw new RuntimeException(e.getTargetException()); 388 } 389 } 390 391 static { 392 Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class; 393 394 // Try the unrelocated ByteString 395 Class<?> byteStringClass; 396 try { 397 // See if it can load the relocated ByteString, which comes from hadoop-thirdparty. 398 byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString"); 399 LOG.debug("Found relocated ByteString class from hadoop-thirdparty." 400 + " Assuming this is Hadoop 3.3.0+."); 401 } catch (ClassNotFoundException e) { 402 LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty." 403 + " Assuming this is below Hadoop 3.3.0", e); 404 try { 405 byteStringClass = Class.forName("com.google.protobuf.ByteString"); 406 LOG.debug("com.google.protobuf.ByteString found."); 407 } catch (ClassNotFoundException ex) { 408 throw new RuntimeException(ex); 409 } 410 } 411 412 // LiteralByteString is a package private class in protobuf. Make it accessible. 413 Class<?> literalByteStringClass; 414 try { 415 literalByteStringClass = 416 Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString"); 417 LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found."); 418 } catch (ClassNotFoundException e) { 419 try { 420 literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString"); 421 LOG.debug("com.google.protobuf.LiteralByteString found."); 422 } catch (ClassNotFoundException ex) { 423 throw new RuntimeException(ex); 424 } 425 } 426 427 try { 428 constructor = literalByteStringClass.getDeclaredConstructor(byte[].class); 429 constructor.setAccessible(true); 430 } catch (NoSuchMethodException e) { 431 throw new RuntimeException(e); 432 } 433 434 try { 435 setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass); 436 } catch (NoSuchMethodException e) { 437 // if either method is not found, we are in big trouble. Abort. 438 throw new RuntimeException(e); 439 } 440 } 441 } 442 443 private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 444 List<CipherOption> options) throws IOException { 445 DataTransferEncryptorMessageProto.Builder builder = 446 DataTransferEncryptorMessageProto.newBuilder(); 447 builder.setStatus(DataTransferEncryptorStatus.SUCCESS); 448 if (payload != null) { 449 BuilderPayloadSetter.wrapAndSetPayload(builder, payload); 450 } 451 if (options != null) { 452 builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); 453 } 454 DataTransferEncryptorMessageProto proto = builder.build(); 455 int size = proto.getSerializedSize(); 456 size += CodedOutputStream.computeUInt32SizeNoTag(size); 457 ByteBuf buf = ctx.alloc().buffer(size); 458 proto.writeDelimitedTo(new ByteBufOutputStream(buf)); 459 safeWrite(ctx, buf); 460 } 461 462 @Override 463 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 464 safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); 465 byte[] firstMessage = new byte[0]; 466 if (saslClient.hasInitialResponse()) { 467 firstMessage = saslClient.evaluateChallenge(firstMessage); 468 } 469 sendSaslMessage(ctx, firstMessage); 470 ctx.flush(); 471 step++; 472 } 473 474 @Override 475 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 476 saslClient.dispose(); 477 } 478 479 private void check(DataTransferEncryptorMessageProto proto) throws IOException { 480 if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { 481 dfsClient.clearDataEncryptionKey(); 482 throw new InvalidEncryptionKeyException(proto.getMessage()); 483 } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { 484 throw new IOException(proto.getMessage()); 485 } 486 } 487 488 private String getNegotiatedQop() { 489 return (String) saslClient.getNegotiatedProperty(Sasl.QOP); 490 } 491 492 private boolean isNegotiatedQopPrivacy() { 493 String qop = getNegotiatedQop(); 494 return qop != null && "auth-conf".equalsIgnoreCase(qop); 495 } 496 497 private boolean requestedQopContainsPrivacy() { 498 Set<String> requestedQop = 499 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 500 return requestedQop.contains("auth-conf"); 501 } 502 503 private void checkSaslComplete() throws IOException { 504 if (!saslClient.isComplete()) { 505 throw new IOException("Failed to complete SASL handshake"); 506 } 507 Set<String> requestedQop = 508 ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); 509 String negotiatedQop = getNegotiatedQop(); 510 // Treat null negotiated QOP as "auth" for the purpose of verification 511 // Code elsewhere does the same implicitly 512 if (negotiatedQop == null) { 513 negotiatedQop = "auth"; 514 } 515 LOG.debug( 516 "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop); 517 if (!requestedQop.contains(negotiatedQop)) { 518 throw new IOException(String.format("SASL handshake completed, but " 519 + "channel does not have acceptable quality of protection, " 520 + "requested = %s, negotiated(effective) = %s", requestedQop, negotiatedQop)); 521 } 522 } 523 524 private boolean useWrap() { 525 String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); 526 return qop != null && !"auth".equalsIgnoreCase(qop); 527 } 528 529 private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException { 530 byte[] inKey = option.getInKey(); 531 if (inKey != null) { 532 inKey = saslClient.unwrap(inKey, 0, inKey.length); 533 } 534 byte[] outKey = option.getOutKey(); 535 if (outKey != null) { 536 outKey = saslClient.unwrap(outKey, 0, outKey.length); 537 } 538 return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, 539 option.getOutIv()); 540 } 541 542 private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto, 543 boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { 544 List<CipherOption> cipherOptions = 545 PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList()); 546 if (cipherOptions == null || cipherOptions.isEmpty()) { 547 return null; 548 } 549 CipherOption cipherOption = cipherOptions.get(0); 550 return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; 551 } 552 553 @Override 554 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 555 if (msg instanceof DataTransferEncryptorMessageProto) { 556 DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; 557 check(proto); 558 byte[] challenge = proto.getPayload().toByteArray(); 559 byte[] response = saslClient.evaluateChallenge(challenge); 560 switch (step) { 561 case 1: { 562 List<CipherOption> cipherOptions = null; 563 if (requestedQopContainsPrivacy()) { 564 cipherOptions = getCipherOptions(); 565 } 566 sendSaslMessage(ctx, response, cipherOptions); 567 ctx.flush(); 568 step++; 569 break; 570 } 571 case 2: { 572 assert response == null; 573 checkSaslComplete(); 574 CipherOption cipherOption = 575 getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); 576 ChannelPipeline p = ctx.pipeline(); 577 while (p.first() != null) { 578 p.removeFirst(); 579 } 580 if (cipherOption != null) { 581 CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite()); 582 p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()), 583 new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv())); 584 } else { 585 if (useWrap()) { 586 p.addLast(new SaslWrapHandler(saslClient), 587 new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), 588 new SaslUnwrapHandler(saslClient)); 589 } 590 } 591 promise.trySuccess(null); 592 break; 593 } 594 default: 595 throw new IllegalArgumentException("Unrecognized negotiation step: " + step); 596 } 597 } else { 598 ctx.fireChannelRead(msg); 599 } 600 } 601 602 @Override 603 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 604 promise.tryFailure(cause); 605 } 606 607 @Override 608 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 609 if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { 610 promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); 611 } else { 612 super.userEventTriggered(ctx, evt); 613 } 614 } 615 } 616 617 private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { 618 619 private final SaslClient saslClient; 620 621 public SaslUnwrapHandler(SaslClient saslClient) { 622 this.saslClient = saslClient; 623 } 624 625 @Override 626 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 627 saslClient.dispose(); 628 } 629 630 @Override 631 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 632 msg.skipBytes(4); 633 byte[] b = new byte[msg.readableBytes()]; 634 msg.readBytes(b); 635 ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); 636 } 637 } 638 639 private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { 640 641 private final SaslClient saslClient; 642 643 private CompositeByteBuf cBuf; 644 645 public SaslWrapHandler(SaslClient saslClient) { 646 this.saslClient = saslClient; 647 } 648 649 @Override 650 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 651 cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); 652 } 653 654 @Override 655 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 656 throws Exception { 657 if (msg instanceof ByteBuf) { 658 ByteBuf buf = (ByteBuf) msg; 659 cBuf.addComponent(buf); 660 cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); 661 } else { 662 safeWrite(ctx, msg); 663 } 664 } 665 666 @Override 667 public void flush(ChannelHandlerContext ctx) throws Exception { 668 if (cBuf.isReadable()) { 669 byte[] b = new byte[cBuf.readableBytes()]; 670 cBuf.readBytes(b); 671 cBuf.discardReadComponents(); 672 byte[] wrapped = saslClient.wrap(b, 0, b.length); 673 ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); 674 buf.writeInt(wrapped.length); 675 buf.writeBytes(wrapped); 676 safeWrite(ctx, buf); 677 } 678 ctx.flush(); 679 } 680 681 @Override 682 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 683 // Release buffer on removal. 684 cBuf.release(); 685 cBuf = null; 686 } 687 } 688 689 private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { 690 691 private final Decryptor decryptor; 692 693 public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 694 throws GeneralSecurityException, IOException { 695 this.decryptor = codec.createDecryptor(); 696 this.decryptor.init(key, Arrays.copyOf(iv, iv.length)); 697 } 698 699 @Override 700 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 701 ByteBuf inBuf; 702 boolean release = false; 703 if (msg.nioBufferCount() == 1) { 704 inBuf = msg; 705 } else { 706 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 707 msg.readBytes(inBuf); 708 release = true; 709 } 710 ByteBuffer inBuffer = inBuf.nioBuffer(); 711 ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); 712 ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes()); 713 decryptor.decrypt(inBuffer, outBuffer); 714 outBuf.writerIndex(inBuf.readableBytes()); 715 if (release) { 716 inBuf.release(); 717 } 718 ctx.fireChannelRead(outBuf); 719 } 720 } 721 722 private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { 723 724 private final Encryptor encryptor; 725 726 public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv) 727 throws GeneralSecurityException, IOException { 728 this.encryptor = codec.createEncryptor(); 729 this.encryptor.init(key, Arrays.copyOf(iv, iv.length)); 730 } 731 732 @Override 733 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) 734 throws Exception { 735 if (preferDirect) { 736 return ctx.alloc().directBuffer(msg.readableBytes()); 737 } else { 738 return ctx.alloc().buffer(msg.readableBytes()); 739 } 740 } 741 742 @Override 743 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { 744 ByteBuf inBuf; 745 boolean release = false; 746 if (msg.nioBufferCount() == 1) { 747 inBuf = msg; 748 } else { 749 inBuf = ctx.alloc().directBuffer(msg.readableBytes()); 750 msg.readBytes(inBuf); 751 release = true; 752 } 753 ByteBuffer inBuffer = inBuf.nioBuffer(); 754 ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes()); 755 encryptor.encrypt(inBuffer, outBuffer); 756 out.writerIndex(inBuf.readableBytes()); 757 if (release) { 758 inBuf.release(); 759 } 760 } 761 } 762 763 private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { 764 return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER 765 + Base64.getEncoder().encodeToString(encryptionKey.nonce); 766 } 767 768 private static char[] encryptionKeyToPassword(byte[] encryptionKey) { 769 return Base64.getEncoder().encodeToString(encryptionKey).toCharArray(); 770 } 771 772 private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { 773 return Base64.getEncoder().encodeToString(blockToken.getIdentifier()); 774 } 775 776 private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { 777 return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray(); 778 } 779 780 private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { 781 Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); 782 saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); 783 saslProps.put(Sasl.SERVER_AUTH, "true"); 784 saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); 785 return saslProps; 786 } 787 788 private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, 789 String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise, 790 DFSClient dfsClient) { 791 try { 792 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), 793 new ProtobufVarint32FrameDecoder(), 794 new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), 795 new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise, 796 dfsClient)); 797 } catch (SaslException e) { 798 saslPromise.tryFailure(e); 799 } 800 } 801 802 static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, 803 int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, 804 Promise<Void> saslPromise) throws IOException { 805 SaslDataTransferClient saslClient = client.getSaslDataTransferClient(); 806 SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient); 807 TrustedChannelResolver trustedChannelResolver = 808 SASL_ADAPTOR.getTrustedChannelResolver(saslClient); 809 AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient); 810 InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); 811 if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { 812 saslPromise.trySuccess(null); 813 return; 814 } 815 DataEncryptionKey encryptionKey = client.newDataEncryptionKey(); 816 if (encryptionKey != null) { 817 if (LOG.isDebugEnabled()) { 818 LOG.debug( 819 "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo); 820 } 821 doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), 822 encryptionKeyToPassword(encryptionKey.encryptionKey), 823 createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, client); 824 } else if (!UserGroupInformation.isSecurityEnabled()) { 825 if (LOG.isDebugEnabled()) { 826 LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr 827 + ", datanodeId = " + dnInfo); 828 } 829 saslPromise.trySuccess(null); 830 } else if (dnInfo.getXferPort() < 1024) { 831 if (LOG.isDebugEnabled()) { 832 LOG.debug("SASL client skipping handshake in secured configuration with " 833 + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); 834 } 835 saslPromise.trySuccess(null); 836 } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { 837 if (LOG.isDebugEnabled()) { 838 LOG.debug("SASL client skipping handshake in secured configuration with " 839 + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); 840 } 841 saslPromise.trySuccess(null); 842 } else if (saslPropsResolver != null) { 843 if (LOG.isDebugEnabled()) { 844 LOG.debug( 845 "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo); 846 } 847 doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), 848 buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise, 849 client); 850 } else { 851 // It's a secured cluster using non-privileged ports, but no SASL. The only way this can 852 // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare 853 // edge case. 854 if (LOG.isDebugEnabled()) { 855 LOG.debug("SASL client skipping handshake in secured configuration with no SASL " 856 + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); 857 } 858 saslPromise.trySuccess(null); 859 } 860 } 861 862 static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client) 863 throws IOException { 864 FileEncryptionInfo feInfo = stat.getFileEncryptionInfo(); 865 if (feInfo == null) { 866 return null; 867 } 868 return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client); 869 } 870}