001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.asyncfs;
019
020import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
021import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
022import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
023
024import java.io.IOException;
025import java.lang.reflect.Constructor;
026import java.lang.reflect.Field;
027import java.lang.reflect.InvocationTargetException;
028import java.lang.reflect.Method;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.nio.ByteBuffer;
032import java.security.GeneralSecurityException;
033import java.util.Arrays;
034import java.util.Base64;
035import java.util.Collections;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041import javax.security.auth.callback.Callback;
042import javax.security.auth.callback.CallbackHandler;
043import javax.security.auth.callback.NameCallback;
044import javax.security.auth.callback.PasswordCallback;
045import javax.security.auth.callback.UnsupportedCallbackException;
046import javax.security.sasl.RealmCallback;
047import javax.security.sasl.RealmChoiceCallback;
048import javax.security.sasl.Sasl;
049import javax.security.sasl.SaslClient;
050import javax.security.sasl.SaslException;
051import org.apache.commons.lang3.StringUtils;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.crypto.CipherOption;
054import org.apache.hadoop.crypto.CipherSuite;
055import org.apache.hadoop.crypto.CryptoCodec;
056import org.apache.hadoop.crypto.Decryptor;
057import org.apache.hadoop.crypto.Encryptor;
058import org.apache.hadoop.crypto.key.KeyProvider;
059import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
060import org.apache.hadoop.fs.FileEncryptionInfo;
061import org.apache.hadoop.hdfs.DFSClient;
062import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
063import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
064import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
065import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
066import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
067import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
068import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
069import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
070import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
071import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
072import org.apache.hadoop.security.SaslPropertiesResolver;
073import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
074import org.apache.hadoop.security.UserGroupInformation;
075import org.apache.hadoop.security.token.Token;
076import org.apache.yetus.audience.InterfaceAudience;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
081import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
082import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
083import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
084import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
085import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
086import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
087import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
088import org.apache.hbase.thirdparty.io.netty.channel.Channel;
089import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
090import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
091import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
092import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
093import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
094import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
095import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
096import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
097import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
098import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
099import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
100import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
101
102/**
103 * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
104 */
105@InterfaceAudience.Private
106public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
107  private static final Logger LOG =
108    LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
109
110  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
111  }
112
113  private static final String SERVER_NAME = "0";
114  private static final String PROTOCOL = "hdfs";
115  private static final String MECHANISM =
116    org.apache.hadoop.security.SaslRpcServer.AuthMethod.TOKEN.getMechanismName();
117  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
118  private static final String NAME_DELIMITER = " ";
119
120  private interface SaslAdaptor {
121
122    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
123
124    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
125
126    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
127  }
128
129  private static final SaslAdaptor SASL_ADAPTOR;
130
131  private interface TransparentCryptoHelper {
132
133    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
134      throws IOException;
135  }
136
137  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
138
139  private static SaslAdaptor createSaslAdaptor()
140    throws NoSuchFieldException, NoSuchMethodException {
141    Field saslPropsResolverField =
142      SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
143    saslPropsResolverField.setAccessible(true);
144    Field trustedChannelResolverField =
145      SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
146    trustedChannelResolverField.setAccessible(true);
147    Field fallbackToSimpleAuthField =
148      SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
149    fallbackToSimpleAuthField.setAccessible(true);
150    return new SaslAdaptor() {
151
152      @Override
153      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
154        try {
155          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
156        } catch (IllegalAccessException e) {
157          throw new RuntimeException(e);
158        }
159      }
160
161      @Override
162      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
163        try {
164          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
165        } catch (IllegalAccessException e) {
166          throw new RuntimeException(e);
167        }
168      }
169
170      @Override
171      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
172        try {
173          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
174        } catch (IllegalAccessException e) {
175          throw new RuntimeException(e);
176        }
177      }
178    };
179  }
180
181  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
182    throws NoSuchMethodException {
183    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
184      .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
185    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
186    return new TransparentCryptoHelper() {
187
188      @Override
189      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
190        DFSClient client) throws IOException {
191        try {
192          KeyVersion decryptedKey =
193            (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
194          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
195          Encryptor encryptor = cryptoCodec.createEncryptor();
196          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
197          return encryptor;
198        } catch (InvocationTargetException e) {
199          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
200          throw new RuntimeException(e.getTargetException());
201        } catch (GeneralSecurityException e) {
202          throw new IOException(e);
203        } catch (IllegalAccessException e) {
204          throw new RuntimeException(e);
205        }
206      }
207    };
208  }
209
210  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
211    throws ClassNotFoundException, NoSuchMethodException {
212    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
213    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
214      "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
215    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
216    return new TransparentCryptoHelper() {
217
218      @Override
219      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
220        DFSClient client) throws IOException {
221        try {
222          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
223            .invoke(null, feInfo, client.getKeyProvider());
224          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
225          Encryptor encryptor = cryptoCodec.createEncryptor();
226          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
227          return encryptor;
228        } catch (InvocationTargetException e) {
229          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
230          throw new RuntimeException(e.getTargetException());
231        } catch (GeneralSecurityException e) {
232          throw new IOException(e);
233        } catch (IllegalAccessException e) {
234          throw new RuntimeException(e);
235        }
236      }
237    };
238  }
239
240  private static TransparentCryptoHelper createTransparentCryptoHelper()
241    throws NoSuchMethodException, ClassNotFoundException {
242    try {
243      return createTransparentCryptoHelperWithoutHDFS12396();
244    } catch (NoSuchMethodException e) {
245      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient,"
246        + " should be hadoop version with HDFS-12396", e);
247    }
248    return createTransparentCryptoHelperWithHDFS12396();
249  }
250
251  static {
252    try {
253      SASL_ADAPTOR = createSaslAdaptor();
254      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
255    } catch (Exception e) {
256      String msg = "Couldn't properly initialize access to HDFS internals. Please "
257        + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
258        + "HBASE-16110 for more information.";
259      LOG.error(msg, e);
260      throw new Error(msg, e);
261    }
262  }
263
264  /**
265   * Sets user name and password when asked by the client-side SASL object.
266   */
267  private static final class SaslClientCallbackHandler implements CallbackHandler {
268
269    private final char[] password;
270    private final String userName;
271
272    /**
273     * Creates a new SaslClientCallbackHandler.
274     * @param userName SASL user name
275     * @param password SASL password
276     */
277    public SaslClientCallbackHandler(String userName, char[] password) {
278      this.password = password;
279      this.userName = userName;
280    }
281
282    @Override
283    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
284      NameCallback nc = null;
285      PasswordCallback pc = null;
286      RealmCallback rc = null;
287      for (Callback callback : callbacks) {
288        if (callback instanceof RealmChoiceCallback) {
289          continue;
290        } else if (callback instanceof NameCallback) {
291          nc = (NameCallback) callback;
292        } else if (callback instanceof PasswordCallback) {
293          pc = (PasswordCallback) callback;
294        } else if (callback instanceof RealmCallback) {
295          rc = (RealmCallback) callback;
296        } else {
297          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
298        }
299      }
300      if (nc != null) {
301        nc.setName(userName);
302      }
303      if (pc != null) {
304        pc.setPassword(password);
305      }
306      if (rc != null) {
307        rc.setText(rc.getDefaultText());
308      }
309    }
310  }
311
312  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
313
314    private final Configuration conf;
315
316    private final Map<String, String> saslProps;
317
318    private final SaslClient saslClient;
319
320    private final int timeoutMs;
321
322    private final Promise<Void> promise;
323
324    private final DFSClient dfsClient;
325
326    private int step = 0;
327
328    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
329      Map<String, String> saslProps, int timeoutMs, Promise<Void> promise, DFSClient dfsClient)
330      throws SaslException {
331      this.conf = conf;
332      this.saslProps = saslProps;
333      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
334        SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
335      this.timeoutMs = timeoutMs;
336      this.promise = promise;
337      this.dfsClient = dfsClient;
338    }
339
340    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
341      sendSaslMessage(ctx, payload, null);
342    }
343
344    private List<CipherOption> getCipherOptions() throws IOException {
345      // Negotiate cipher suites if configured. Currently, the only supported
346      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
347      // values for future expansion.
348      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
349      if (StringUtils.isBlank(cipherSuites)) {
350        return null;
351      }
352      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
353        throw new IOException(String.format("Invalid cipher suite, %s=%s",
354          DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
355      }
356      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
357    }
358
359    /**
360     * The asyncfs subsystem emulates a HDFS client by sending protobuf messages via netty. After
361     * Hadoop 3.3.0, the protobuf classes are relocated to org.apache.hadoop.thirdparty.protobuf.*.
362     * Use Reflection to check which ones to use.
363     */
364    private static class BuilderPayloadSetter {
365      private static Method setPayloadMethod;
366      private static Constructor<?> constructor;
367
368      /**
369       * Create a ByteString from byte array without copying (wrap), and then set it as the payload
370       * for the builder.
371       * @param builder builder for HDFS DataTransferEncryptorMessage.
372       * @param payload byte array of payload.
373       */
374      static void wrapAndSetPayload(DataTransferEncryptorMessageProto.Builder builder,
375        byte[] payload) throws IOException {
376        Object byteStringObject;
377        try {
378          // byteStringObject = new LiteralByteString(payload);
379          byteStringObject = constructor.newInstance(payload);
380          // builder.setPayload(byteStringObject);
381          setPayloadMethod.invoke(builder, constructor.getDeclaringClass().cast(byteStringObject));
382        } catch (IllegalAccessException | InstantiationException e) {
383          throw new RuntimeException(e);
384
385        } catch (InvocationTargetException e) {
386          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
387          throw new RuntimeException(e.getTargetException());
388        }
389      }
390
391      static {
392        Class<?> builderClass = DataTransferEncryptorMessageProto.Builder.class;
393
394        // Try the unrelocated ByteString
395        Class<?> byteStringClass;
396        try {
397          // See if it can load the relocated ByteString, which comes from hadoop-thirdparty.
398          byteStringClass = Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString");
399          LOG.debug("Found relocated ByteString class from hadoop-thirdparty."
400            + " Assuming this is Hadoop 3.3.0+.");
401        } catch (ClassNotFoundException e) {
402          LOG.debug("Did not find relocated ByteString class from hadoop-thirdparty."
403            + " Assuming this is below Hadoop 3.3.0", e);
404          try {
405            byteStringClass = Class.forName("com.google.protobuf.ByteString");
406            LOG.debug("com.google.protobuf.ByteString found.");
407          } catch (ClassNotFoundException ex) {
408            throw new RuntimeException(ex);
409          }
410        }
411
412        // LiteralByteString is a package private class in protobuf. Make it accessible.
413        Class<?> literalByteStringClass;
414        try {
415          literalByteStringClass =
416            Class.forName("org.apache.hadoop.thirdparty.protobuf.ByteString$LiteralByteString");
417          LOG.debug("Shaded LiteralByteString from hadoop-thirdparty is found.");
418        } catch (ClassNotFoundException e) {
419          try {
420            literalByteStringClass = Class.forName("com.google.protobuf.LiteralByteString");
421            LOG.debug("com.google.protobuf.LiteralByteString found.");
422          } catch (ClassNotFoundException ex) {
423            throw new RuntimeException(ex);
424          }
425        }
426
427        try {
428          constructor = literalByteStringClass.getDeclaredConstructor(byte[].class);
429          constructor.setAccessible(true);
430        } catch (NoSuchMethodException e) {
431          throw new RuntimeException(e);
432        }
433
434        try {
435          setPayloadMethod = builderClass.getMethod("setPayload", byteStringClass);
436        } catch (NoSuchMethodException e) {
437          // if either method is not found, we are in big trouble. Abort.
438          throw new RuntimeException(e);
439        }
440      }
441    }
442
443    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
444      List<CipherOption> options) throws IOException {
445      DataTransferEncryptorMessageProto.Builder builder =
446        DataTransferEncryptorMessageProto.newBuilder();
447      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
448      if (payload != null) {
449        BuilderPayloadSetter.wrapAndSetPayload(builder, payload);
450      }
451      if (options != null) {
452        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
453      }
454      DataTransferEncryptorMessageProto proto = builder.build();
455      int size = proto.getSerializedSize();
456      size += CodedOutputStream.computeUInt32SizeNoTag(size);
457      ByteBuf buf = ctx.alloc().buffer(size);
458      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
459      safeWrite(ctx, buf);
460    }
461
462    @Override
463    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
464      safeWrite(ctx, ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
465      byte[] firstMessage = new byte[0];
466      if (saslClient.hasInitialResponse()) {
467        firstMessage = saslClient.evaluateChallenge(firstMessage);
468      }
469      sendSaslMessage(ctx, firstMessage);
470      ctx.flush();
471      step++;
472    }
473
474    @Override
475    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
476      saslClient.dispose();
477    }
478
479    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
480      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
481        dfsClient.clearDataEncryptionKey();
482        throw new InvalidEncryptionKeyException(proto.getMessage());
483      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
484        throw new IOException(proto.getMessage());
485      }
486    }
487
488    private String getNegotiatedQop() {
489      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
490    }
491
492    private boolean isNegotiatedQopPrivacy() {
493      String qop = getNegotiatedQop();
494      return qop != null && "auth-conf".equalsIgnoreCase(qop);
495    }
496
497    private boolean requestedQopContainsPrivacy() {
498      Set<String> requestedQop =
499        ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
500      return requestedQop.contains("auth-conf");
501    }
502
503    private void checkSaslComplete() throws IOException {
504      if (!saslClient.isComplete()) {
505        throw new IOException("Failed to complete SASL handshake");
506      }
507      Set<String> requestedQop =
508        ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
509      String negotiatedQop = getNegotiatedQop();
510      // Treat null negotiated QOP as "auth" for the purpose of verification
511      // Code elsewhere does the same implicitly
512      if (negotiatedQop == null) {
513        negotiatedQop = "auth";
514      }
515      LOG.debug(
516        "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
517      if (!requestedQop.contains(negotiatedQop)) {
518        throw new IOException(String.format("SASL handshake completed, but "
519          + "channel does not have acceptable quality of protection, "
520          + "requested = %s, negotiated(effective) = %s", requestedQop, negotiatedQop));
521      }
522    }
523
524    private boolean useWrap() {
525      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
526      return qop != null && !"auth".equalsIgnoreCase(qop);
527    }
528
529    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
530      byte[] inKey = option.getInKey();
531      if (inKey != null) {
532        inKey = saslClient.unwrap(inKey, 0, inKey.length);
533      }
534      byte[] outKey = option.getOutKey();
535      if (outKey != null) {
536        outKey = saslClient.unwrap(outKey, 0, outKey.length);
537      }
538      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
539        option.getOutIv());
540    }
541
542    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
543      boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
544      List<CipherOption> cipherOptions =
545        PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
546      if (cipherOptions == null || cipherOptions.isEmpty()) {
547        return null;
548      }
549      CipherOption cipherOption = cipherOptions.get(0);
550      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
551    }
552
553    @Override
554    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
555      if (msg instanceof DataTransferEncryptorMessageProto) {
556        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
557        check(proto);
558        byte[] challenge = proto.getPayload().toByteArray();
559        byte[] response = saslClient.evaluateChallenge(challenge);
560        switch (step) {
561          case 1: {
562            List<CipherOption> cipherOptions = null;
563            if (requestedQopContainsPrivacy()) {
564              cipherOptions = getCipherOptions();
565            }
566            sendSaslMessage(ctx, response, cipherOptions);
567            ctx.flush();
568            step++;
569            break;
570          }
571          case 2: {
572            assert response == null;
573            checkSaslComplete();
574            CipherOption cipherOption =
575              getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
576            ChannelPipeline p = ctx.pipeline();
577            while (p.first() != null) {
578              p.removeFirst();
579            }
580            if (cipherOption != null) {
581              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
582              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
583                new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
584            } else {
585              if (useWrap()) {
586                p.addLast(new SaslWrapHandler(saslClient),
587                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
588                  new SaslUnwrapHandler(saslClient));
589              }
590            }
591            promise.trySuccess(null);
592            break;
593          }
594          default:
595            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
596        }
597      } else {
598        ctx.fireChannelRead(msg);
599      }
600    }
601
602    @Override
603    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
604      promise.tryFailure(cause);
605    }
606
607    @Override
608    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
609      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
610        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
611      } else {
612        super.userEventTriggered(ctx, evt);
613      }
614    }
615  }
616
617  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
618
619    private final SaslClient saslClient;
620
621    public SaslUnwrapHandler(SaslClient saslClient) {
622      this.saslClient = saslClient;
623    }
624
625    @Override
626    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
627      saslClient.dispose();
628    }
629
630    @Override
631    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
632      msg.skipBytes(4);
633      byte[] b = new byte[msg.readableBytes()];
634      msg.readBytes(b);
635      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
636    }
637  }
638
639  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
640
641    private final SaslClient saslClient;
642
643    private CompositeByteBuf cBuf;
644
645    public SaslWrapHandler(SaslClient saslClient) {
646      this.saslClient = saslClient;
647    }
648
649    @Override
650    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
651      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
652    }
653
654    @Override
655    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
656      throws Exception {
657      if (msg instanceof ByteBuf) {
658        ByteBuf buf = (ByteBuf) msg;
659        cBuf.addComponent(buf);
660        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
661      } else {
662        safeWrite(ctx, msg);
663      }
664    }
665
666    @Override
667    public void flush(ChannelHandlerContext ctx) throws Exception {
668      if (cBuf.isReadable()) {
669        byte[] b = new byte[cBuf.readableBytes()];
670        cBuf.readBytes(b);
671        cBuf.discardReadComponents();
672        byte[] wrapped = saslClient.wrap(b, 0, b.length);
673        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
674        buf.writeInt(wrapped.length);
675        buf.writeBytes(wrapped);
676        safeWrite(ctx, buf);
677      }
678      ctx.flush();
679    }
680
681    @Override
682    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
683      // Release buffer on removal.
684      cBuf.release();
685      cBuf = null;
686    }
687  }
688
689  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
690
691    private final Decryptor decryptor;
692
693    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
694      throws GeneralSecurityException, IOException {
695      this.decryptor = codec.createDecryptor();
696      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
697    }
698
699    @Override
700    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
701      ByteBuf inBuf;
702      boolean release = false;
703      if (msg.nioBufferCount() == 1) {
704        inBuf = msg;
705      } else {
706        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
707        msg.readBytes(inBuf);
708        release = true;
709      }
710      ByteBuffer inBuffer = inBuf.nioBuffer();
711      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
712      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
713      decryptor.decrypt(inBuffer, outBuffer);
714      outBuf.writerIndex(inBuf.readableBytes());
715      if (release) {
716        inBuf.release();
717      }
718      ctx.fireChannelRead(outBuf);
719    }
720  }
721
722  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
723
724    private final Encryptor encryptor;
725
726    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
727      throws GeneralSecurityException, IOException {
728      this.encryptor = codec.createEncryptor();
729      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
730    }
731
732    @Override
733    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
734      throws Exception {
735      if (preferDirect) {
736        return ctx.alloc().directBuffer(msg.readableBytes());
737      } else {
738        return ctx.alloc().buffer(msg.readableBytes());
739      }
740    }
741
742    @Override
743    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
744      ByteBuf inBuf;
745      boolean release = false;
746      if (msg.nioBufferCount() == 1) {
747        inBuf = msg;
748      } else {
749        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
750        msg.readBytes(inBuf);
751        release = true;
752      }
753      ByteBuffer inBuffer = inBuf.nioBuffer();
754      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
755      encryptor.encrypt(inBuffer, outBuffer);
756      out.writerIndex(inBuf.readableBytes());
757      if (release) {
758        inBuf.release();
759      }
760    }
761  }
762
763  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
764    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
765      + Base64.getEncoder().encodeToString(encryptionKey.nonce);
766  }
767
768  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
769    return Base64.getEncoder().encodeToString(encryptionKey).toCharArray();
770  }
771
772  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
773    return Base64.getEncoder().encodeToString(blockToken.getIdentifier());
774  }
775
776  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
777    return Base64.getEncoder().encodeToString(blockToken.getPassword()).toCharArray();
778  }
779
780  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
781    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
782    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
783    saslProps.put(Sasl.SERVER_AUTH, "true");
784    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
785    return saslProps;
786  }
787
788  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
789    String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
790    DFSClient dfsClient) {
791    try {
792      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
793        new ProtobufVarint32FrameDecoder(),
794        new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
795        new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
796          dfsClient));
797    } catch (SaslException e) {
798      saslPromise.tryFailure(e);
799    }
800  }
801
802  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
803    int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
804    Promise<Void> saslPromise) throws IOException {
805    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
806    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
807    TrustedChannelResolver trustedChannelResolver =
808      SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
809    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
810    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
811    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
812      saslPromise.trySuccess(null);
813      return;
814    }
815    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
816    if (encryptionKey != null) {
817      if (LOG.isDebugEnabled()) {
818        LOG.debug(
819          "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
820      }
821      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
822        encryptionKeyToPassword(encryptionKey.encryptionKey),
823        createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise, client);
824    } else if (!UserGroupInformation.isSecurityEnabled()) {
825      if (LOG.isDebugEnabled()) {
826        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
827          + ", datanodeId = " + dnInfo);
828      }
829      saslPromise.trySuccess(null);
830    } else if (dnInfo.getXferPort() < 1024) {
831      if (LOG.isDebugEnabled()) {
832        LOG.debug("SASL client skipping handshake in secured configuration with "
833          + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
834      }
835      saslPromise.trySuccess(null);
836    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
837      if (LOG.isDebugEnabled()) {
838        LOG.debug("SASL client skipping handshake in secured configuration with "
839          + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
840      }
841      saslPromise.trySuccess(null);
842    } else if (saslPropsResolver != null) {
843      if (LOG.isDebugEnabled()) {
844        LOG.debug(
845          "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
846      }
847      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
848        buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
849        client);
850    } else {
851      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
852      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
853      // edge case.
854      if (LOG.isDebugEnabled()) {
855        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
856          + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
857      }
858      saslPromise.trySuccess(null);
859    }
860  }
861
862  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
863    throws IOException {
864    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
865    if (feInfo == null) {
866      return null;
867    }
868    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
869  }
870}