001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
022import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
023
024import java.io.IOException;
025import java.lang.reflect.Constructor;
026import java.lang.reflect.Field;
027import java.lang.reflect.InvocationTargetException;
028import java.lang.reflect.Method;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.nio.ByteBuffer;
032import java.security.GeneralSecurityException;
033import java.util.Arrays;
034import java.util.Base64;
035import java.util.Collections;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041import javax.security.auth.callback.Callback;
042import javax.security.auth.callback.CallbackHandler;
043import javax.security.auth.callback.NameCallback;
044import javax.security.auth.callback.PasswordCallback;
045import javax.security.auth.callback.UnsupportedCallbackException;
046import javax.security.sasl.RealmCallback;
047import javax.security.sasl.RealmChoiceCallback;
048import javax.security.sasl.Sasl;
049import javax.security.sasl.SaslClient;
050import javax.security.sasl.SaslException;
051import org.apache.commons.lang3.StringUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.crypto.CipherOption;
054import org.apache.hadoop.crypto.CipherSuite;
055import org.apache.hadoop.crypto.CryptoCodec;
056import org.apache.hadoop.crypto.Decryptor;
057import org.apache.hadoop.crypto.Encryptor;
058import org.apache.hadoop.crypto.key.KeyProvider;
059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
060import org.apache.hadoop.fs.FileEncryptionInfo;
061import org.apache.hadoop.hdfs.DFSClient;
062import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
072import org.apache.hadoop.security.SaslPropertiesResolver;
073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
074import org.apache.hadoop.security.UserGroupInformation;
075import org.apache.hadoop.security.token.Token;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
082import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
083import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
088import org.apache.hbase.thirdparty.io.netty.channel.Channel;
089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
100import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
101
102/**
103 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
104 */
105@InterfaceAudience.Private
106public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
107  private static final Logger LOG =
108    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
109
110  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
111  }
112
113  private static final String SERVER_NAME = "0";
114  private static final String PROTOCOL = "hdfs";
115  private static final String MECHANISM = "DIGEST-MD5";
116  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
117  private static final String NAME_DELIMITER = " ";
118
119  private interface SaslAdaptor {
120
121    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
122
123    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
124
125    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
126  }
127
128  private static final SaslAdaptor SASL_ADAPTOR;
129
130  private interface TransparentCryptoHelper {
131
132    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
133      throws IOException;
134  }
135
136  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
137
138  private static SaslAdaptor createSaslAdaptor()
139    throws NoSuchFieldException, NoSuchMethodException {
140    Field saslPropsResolverField =
141      SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
142    saslPropsResolverField.setAccessible(true);
143    Field trustedChannelResolverField =
144      SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
145    trustedChannelResolverField.setAccessible(true);
146    Field fallbackToSimpleAuthField =
147      SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
148    fallbackToSimpleAuthField.setAccessible(true);
149    return new SaslAdaptor() {
150
151      @Override
152      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
153        try {
154          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
155        } catch (IllegalAccessException e) {
156          throw new RuntimeException(e);
157        }
158      }
159
160      @Override
161      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
162        try {
163          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
164        } catch (IllegalAccessException e) {
165          throw new RuntimeException(e);
166        }
167      }
168
169      @Override
170      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
171        try {
172          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
173        } catch (IllegalAccessException e) {
174          throw new RuntimeException(e);
175        }
176      }
177    };
178  }
179
180  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
181    throws NoSuchMethodException {
182    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
183      .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
184    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
185    return new TransparentCryptoHelper() {
186
187      @Override
188      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
189        DFSClient client) throws IOException {
190        try {
191          KeyVersion decryptedKey =
192            (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
193          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
194          Encryptor encryptor = cryptoCodec.createEncryptor();
195          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
196          return encryptor;
197        } catch (InvocationTargetException e) {
198          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
199          throw new RuntimeException(e.getTargetException());
200        } catch (GeneralSecurityException e) {
201          throw new IOException(e);
202        } catch (IllegalAccessException e) {
203          throw new RuntimeException(e);
204        }
205      }
206    };
207  }
208
209  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
210    throws ClassNotFoundException, NoSuchMethodException {
211    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
212    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
213      "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
214    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
215    return new TransparentCryptoHelper() {
216
217      @Override
218      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
219        DFSClient client) throws IOException {
220        try {
221          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
222            .invoke(null, feInfo, client.getKeyProvider());
223          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
224          Encryptor encryptor = cryptoCodec.createEncryptor();
225          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
226          return encryptor;
227        } catch (InvocationTargetException e) {
228          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
229          throw new RuntimeException(e.getTargetException());
230        } catch (GeneralSecurityException e) {
231          throw new IOException(e);
232        } catch (IllegalAccessException e) {
233          throw new RuntimeException(e);
234        }
235      }
236    };
237  }
238
239  private static TransparentCryptoHelper createTransparentCryptoHelper()
240    throws NoSuchMethodException, ClassNotFoundException {
241    try {
242      return createTransparentCryptoHelperWithoutHDFS12396();
243    } catch (NoSuchMethodException e) {
244      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient,"
245        + " should be hadoop version with HDFS-12396", e);
246    }
247    return createTransparentCryptoHelperWithHDFS12396();
248  }
249
250  static {
251    try {
252      SASL_ADAPTOR = createSaslAdaptor();
253      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
254    } catch (Exception e) {
255      String msg = "Couldn't properly initialize access to HDFS internals. Please "
256        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
257        + "HBASE-16110 for more information.";
258      LOG.error(msg, e);
259      throw new Error(msg, e);
260    }
261  }
262
263  /**
264   * Sets user name and password when asked by the client-side SASL object.
265   */
266  private static final class SaslClientCallbackHandler implements CallbackHandler {
267
268    private final char[] password;
269    private final String userName;
270
271    /**
272     * Creates a new SaslClientCallbackHandler.
273     * @param userName SASL user name
274     * @param password SASL password
275     */
276    public SaslClientCallbackHandler(String userName, char[] password) {
277      this.password = password;
278      this.userName = userName;
279    }
280
281    @Override
282    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
283      NameCallback nc = null;
284      PasswordCallback pc = null;
285      RealmCallback rc = null;
286      for (Callback callback : callbacks) {
287        if (callback instanceof RealmChoiceCallback) {
288          continue;
289        } else if (callback instanceof NameCallback) {
290          nc = (NameCallback) callback;
291        } else if (callback instanceof PasswordCallback) {
292          pc = (PasswordCallback) callback;
293        } else if (callback instanceof RealmCallback) {
294          rc = (RealmCallback) callback;
295        } else {
296          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
297        }
298      }
299      if (nc != null) {
300        nc.setName(userName);
301      }
302      if (pc != null) {
303        pc.setPassword(password);
304      }
305      if (rc != null) {
306        rc.setText(rc.getDefaultText());
307      }
308    }
309  }
310
311  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
312
313    private final Configuration conf;
314
315    private final Map<String, String> saslProps;
316
317    private final SaslClient saslClient;
318
319    private final int timeoutMs;
320
321    private final Promise<Void> promise;
322
323    private final DFSClient dfsClient;
324
325    private int step = 0;
326
327    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
328      Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, DFSClient dfsClient)
329      throws SaslException {
330      this.conf = conf;
331      this.saslProps = saslProps;
332      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
333        SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
334      this.timeoutMs = timeoutMs;
335      this.promise = promise;
336      this.dfsClient = dfsClient;
337    }
338
339    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
340      sendSaslMessage(ctx, payload, null);
341    }
342
343    private List<CipherOption> getCipherOptions() throws IOException {
344      // Negotiate cipher suites if configured. Currently, the only supported
345      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
346      // values for future expansion.
347      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
348      if (StringUtils.isBlank(cipherSuites)) {
349        return null;
350      }
351      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
352        throw new IOException(String.format("Invalid cipher suite, %s=%s",
353          DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
354      }
355      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
356    }
357
358    /**
359     * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. After
360     * Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
361     * Use Reflection to check which ones to use.
362     */
363    private static class BuilderPayloadSetter {
364      private static Method setPayloadMethod;
365      private static Constructor<?> constructor;
366
367      /**
368       * Create a ByteString from byte array without copying (wrap), and then set it as the payload
369       * for the builder.
370       * @param builder builder for HDFS DataTransferEncryptorMessage.
371       * @param payload byte array of payload.
372       */
373      static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder,
374        byte[] payload) throws IOException {
375        Object byteStringObject;
376        try {
377          // byteStringObject = new LiteralByteString(payload);
378          byteStringObject = constructor.newInstance(payload);
379          // builder.setPayload(byteStringObject);
380          setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
381        } catch (IllegalAccessException | InstantiationException e) {
382          throw new RuntimeException(e);
383
384        } catch (InvocationTargetException e) {
385          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
386          throw new RuntimeException(e.getTargetException());
387        }
388      }
389
390      static {
391        Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
392
393        // Try the unrelocated ByteString
394        Class<?> byteStringClass;
395        try {
396          // See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
397          byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
398          LOG.debug("Found relocated ByteString class from hadoop-thirdparty."
399            + " Assuming this is Hadoop 3.3.0+.");
400        } catch (ClassNotFoundException e) {
401          LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty."
402            + " Assuming this is below Hadoop 3.3.0", e);
403          try {
404            byteStringClass = Class.forName("com.google.protobuf.ByteString");
405            LOG.debug("com.google.protobuf.ByteString found.");
406          } catch (ClassNotFoundException ex) {
407            throw new RuntimeException(ex);
408          }
409        }
410
411        // LiteralByteString is a package private class in protobuf. Make it accessible.
412        Class<?> literalByteStringClass;
413        try {
414          literalByteStringClass =
415            Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
416          LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
417        } catch (ClassNotFoundException e) {
418          try {
419            literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
420            LOG.debug("com.google.protobuf.LiteralByteString found.");
421          } catch (ClassNotFoundException ex) {
422            throw new RuntimeException(ex);
423          }
424        }
425
426        try {
427          constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
428          constructor.setAccessible(true);
429        } catch (NoSuchMethodException e) {
430          throw new RuntimeException(e);
431        }
432
433        try {
434          setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
435        } catch (NoSuchMethodException e) {
436          // if either method is not found, we are in big trouble. Abort.
437          throw new RuntimeException(e);
438        }
439      }
440    }
441
442    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
443      List<CipherOption> options) throws IOException {
444      DataTransferEncryptorMessageProto.Builder builder =
445        DataTransferEncryptorMessageProto.newBuilder();
446      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
447      if (payload != null) {
448        BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
449      }
450      if (options != null) {
451        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
452      }
453      DataTransferEncryptorMessageProto proto = builder.build();
454      int size = proto.getSerializedSize();
455      size += CodedOutputStream.computeUInt32SizeNoTag(size);
456      ByteBuf buf = ctx.alloc().buffer(size);
457      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
458      safeWrite(ctx, buf);
459    }
460
461    @Override
462    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
463      safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
464      sendSaslMessage(ctx, new byte[0]);
465      ctx.flush();
466      step++;
467    }
468
469    @Override
470    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
471      saslClient.dispose();
472    }
473
474    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
475      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
476        dfsClient.clearDataEncryptionKey();
477        throw new InvalidEncryptionKeyException(proto.getMessage());
478      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
479        throw new IOException(proto.getMessage());
480      }
481    }
482
483    private String getNegotiatedQop() {
484      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
485    }
486
487    private boolean isNegotiatedQopPrivacy() {
488      String qop = getNegotiatedQop();
489      return qop != null && "auth-conf".equalsIgnoreCase(qop);
490    }
491
492    private boolean requestedQopContainsPrivacy() {
493      Set<String> requestedQop =
494        ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
495      return requestedQop.contains("auth-conf");
496    }
497
498    private void checkSaslComplete() throws IOException {
499      if (!saslClient.isComplete()) {
500        throw new IOException("Failed to complete SASL handshake");
501      }
502      Set<String> requestedQop =
503        ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
504      String negotiatedQop = getNegotiatedQop();
505      LOG.debug(
506        "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
507      if (!requestedQop.contains(negotiatedQop)) {
508        throw new IOException(String.format("SASL handshake completed, but "
509          + "channel does not have acceptable quality of protection, "
510          + "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
511      }
512    }
513
514    private boolean useWrap() {
515      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
516      return qop != null && !"auth".equalsIgnoreCase(qop);
517    }
518
519    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
520      byte[] inKey = option.getInKey();
521      if (inKey != null) {
522        inKey = saslClient.unwrap(inKey, 0, inKey.length);
523      }
524      byte[] outKey = option.getOutKey();
525      if (outKey != null) {
526        outKey = saslClient.unwrap(outKey, 0, outKey.length);
527      }
528      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
529        option.getOutIv());
530    }
531
532    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
533      boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
534      List<CipherOption> cipherOptions =
535        PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
536      if (cipherOptions == null || cipherOptions.isEmpty()) {
537        return null;
538      }
539      CipherOption cipherOption = cipherOptions.get(0);
540      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
541    }
542
543    @Override
544    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
545      if (msg instanceof DataTransferEncryptorMessageProto) {
546        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
547        check(proto);
548        byte[] challenge = proto.getPayload().toByteArray();
549        byte[] response = saslClient.evaluateChallenge(challenge);
550        switch (step) {
551          case 1: {
552            List<CipherOption> cipherOptions = null;
553            if (requestedQopContainsPrivacy()) {
554              cipherOptions = getCipherOptions();
555            }
556            sendSaslMessage(ctx, response, cipherOptions);
557            ctx.flush();
558            step++;
559            break;
560          }
561          case 2: {
562            assert response == null;
563            checkSaslComplete();
564            CipherOption cipherOption =
565              getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
566            ChannelPipeline p = ctx.pipeline();
567            while (p.first() != null) {
568              p.removeFirst();
569            }
570            if (cipherOption != null) {
571              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
572              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
573                new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
574            } else {
575              if (useWrap()) {
576                p.addLast(new SaslWrapHandler(saslClient),
577                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
578                  new SaslUnwrapHandler(saslClient));
579              }
580            }
581            promise.trySuccess(null);
582            break;
583          }
584          default:
585            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
586        }
587      } else {
588        ctx.fireChannelRead(msg);
589      }
590    }
591
592    @Override
593    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
594      promise.tryFailure(cause);
595    }
596
597    @Override
598    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
599      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
600        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
601      } else {
602        super.userEventTriggered(ctx, evt);
603      }
604    }
605  }
606
607  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
608
609    private final SaslClient saslClient;
610
611    public SaslUnwrapHandler(SaslClient saslClient) {
612      this.saslClient = saslClient;
613    }
614
615    @Override
616    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
617      saslClient.dispose();
618    }
619
620    @Override
621    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
622      msg.skipBytes(4);
623      byte[] b = new byte[msg.readableBytes()];
624      msg.readBytes(b);
625      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
626    }
627  }
628
629  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
630
631    private final SaslClient saslClient;
632
633    private CompositeByteBuf cBuf;
634
635    public SaslWrapHandler(SaslClient saslClient) {
636      this.saslClient = saslClient;
637    }
638
639    @Override
640    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
641      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
642    }
643
644    @Override
645    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
646      throws Exception {
647      if (msg instanceof ByteBuf) {
648        ByteBuf buf = (ByteBuf) msg;
649        cBuf.addComponent(buf);
650        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
651      } else {
652        safeWrite(ctx, msg);
653      }
654    }
655
656    @Override
657    public void flush(ChannelHandlerContext ctx) throws Exception {
658      if (cBuf.isReadable()) {
659        byte[] b = new byte[cBuf.readableBytes()];
660        cBuf.readBytes(b);
661        cBuf.discardReadComponents();
662        byte[] wrapped = saslClient.wrap(b, 0, b.length);
663        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
664        buf.writeInt(wrapped.length);
665        buf.writeBytes(wrapped);
666        safeWrite(ctx, buf);
667      }
668      ctx.flush();
669    }
670
671    @Override
672    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
673      // Release buffer on removal.
674      cBuf.release();
675      cBuf = null;
676    }
677  }
678
679  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
680
681    private final Decryptor decryptor;
682
683    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
684      throws GeneralSecurityException, IOException {
685      this.decryptor = codec.createDecryptor();
686      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
687    }
688
689    @Override
690    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
691      ByteBuf inBuf;
692      boolean release = false;
693      if (msg.nioBufferCount() == 1) {
694        inBuf = msg;
695      } else {
696        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
697        msg.readBytes(inBuf);
698        release = true;
699      }
700      ByteBuffer inBuffer = inBuf.nioBuffer();
701      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
702      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
703      decryptor.decrypt(inBuffer, outBuffer);
704      outBuf.writerIndex(inBuf.readableBytes());
705      if (release) {
706        inBuf.release();
707      }
708      ctx.fireChannelRead(outBuf);
709    }
710  }
711
712  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
713
714    private final Encryptor encryptor;
715
716    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
717      throws GeneralSecurityException, IOException {
718      this.encryptor = codec.createEncryptor();
719      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
720    }
721
722    @Override
723    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
724      throws Exception {
725      if (preferDirect) {
726        return ctx.alloc().directBuffer(msg.readableBytes());
727      } else {
728        return ctx.alloc().buffer(msg.readableBytes());
729      }
730    }
731
732    @Override
733    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
734      ByteBuf inBuf;
735      boolean release = false;
736      if (msg.nioBufferCount() == 1) {
737        inBuf = msg;
738      } else {
739        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
740        msg.readBytes(inBuf);
741        release = true;
742      }
743      ByteBuffer inBuffer = inBuf.nioBuffer();
744      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
745      encryptor.encrypt(inBuffer, outBuffer);
746      out.writerIndex(inBuf.readableBytes());
747      if (release) {
748        inBuf.release();
749      }
750    }
751  }
752
753  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
754    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
755      + Base64.getEncoder().encodeToString(encryptionKey.nonce);
756  }
757
758  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
759    return Base64.getEncoder().encodeToString(encryptionKey).toCharArray();
760  }
761
762  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
763    return Base64.getEncoder().encodeToString(blockToken.getIdentifier());
764  }
765
766  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
767    return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray();
768  }
769
770  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
771    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
772    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
773    saslProps.put(Sasl.SERVER_AUTH, "true");
774    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
775    return saslProps;
776  }
777
778  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
779    String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
780    DFSClient dfsClient) {
781    try {
782      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
783        new ProtobufVarint32FrameDecoder(),
784        new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
785        new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
786          dfsClient));
787    } catch (SaslException e) {
788      saslPromise.tryFailure(e);
789    }
790  }
791
792  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
793    int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
794    Promise<Void> saslPromise) throws IOException {
795    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
796    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
797    TrustedChannelResolver trustedChannelResolver =
798      SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
799    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
800    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
801    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
802      saslPromise.trySuccess(null);
803      return;
804    }
805    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
806    if (encryptionKey != null) {
807      if (LOG.isDebugEnabled()) {
808        LOG.debug(
809          "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
810      }
811      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
812        encryptionKeyToPassword(encryptionKey.encryptionKey),
813        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, client);
814    } else if (!UserGroupInformation.isSecurityEnabled()) {
815      if (LOG.isDebugEnabled()) {
816        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
817          + ", datanodeId = " + dnInfo);
818      }
819      saslPromise.trySuccess(null);
820    } else if (dnInfo.getXferPort() < 1024) {
821      if (LOG.isDebugEnabled()) {
822        LOG.debug("SASL client skipping handshake in secured configuration with "
823          + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
824      }
825      saslPromise.trySuccess(null);
826    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
827      if (LOG.isDebugEnabled()) {
828        LOG.debug("SASL client skipping handshake in secured configuration with "
829          + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
830      }
831      saslPromise.trySuccess(null);
832    } else if (saslPropsResolver != null) {
833      if (LOG.isDebugEnabled()) {
834        LOG.debug(
835          "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
836      }
837      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
838        buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
839        client);
840    } else {
841      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
842      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
843      // edge case.
844      if (LOG.isDebugEnabled()) {
845        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
846          + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
847      }
848      saslPromise.trySuccess(null);
849    }
850  }
851
852  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
853    throws IOException {
854    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
855    if (feInfo == null) {
856      return null;
857    }
858    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
859  }
860}