001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security;
019
020import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
021import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
022import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
023import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
024import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal;
025import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
026import static org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders.SELECTOR_KEY;
027import static org.hamcrest.MatcherAssert.assertThat;
028import static org.hamcrest.Matchers.either;
029import static org.hamcrest.Matchers.instanceOf;
030import static org.junit.Assert.assertEquals;
031import static org.junit.Assert.assertNotSame;
032import static org.junit.Assert.assertSame;
033import static org.junit.Assert.assertThrows;
034import static org.junit.Assert.fail;
035
036import java.io.EOFException;
037import java.io.File;
038import java.io.IOException;
039import java.lang.reflect.Field;
040import java.net.InetAddress;
041import java.net.InetSocketAddress;
042import java.security.PrivilegedExceptionAction;
043import java.util.ArrayList;
044import java.util.Collections;
045import java.util.Map;
046import javax.security.sasl.SaslClient;
047import javax.security.sasl.SaslException;
048import org.apache.commons.lang3.RandomStringUtils;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
053import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
054import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
055import org.apache.hadoop.hbase.ipc.RpcClient;
056import org.apache.hadoop.hbase.ipc.RpcClientFactory;
057import org.apache.hadoop.hbase.ipc.RpcServer;
058import org.apache.hadoop.hbase.ipc.RpcServerFactory;
059import org.apache.hadoop.hbase.security.provider.AuthenticationProviderSelector;
060import org.apache.hadoop.hbase.security.provider.BuiltInProviderSelector;
061import org.apache.hadoop.hbase.security.provider.SaslAuthMethod;
062import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
063import org.apache.hadoop.hbase.util.Pair;
064import org.apache.hadoop.minikdc.MiniKdc;
065import org.apache.hadoop.security.UserGroupInformation;
066import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
067import org.apache.hadoop.security.token.Token;
068import org.apache.hadoop.security.token.TokenIdentifier;
069import org.junit.Test;
070import org.mockito.Mockito;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
074
075import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
076import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
078
079public class AbstractTestSecureIPC {
080
081  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
082
083  protected static final File KEYTAB_FILE =
084    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
085
086  protected static MiniKdc KDC;
087  protected static String HOST = "localhost";
088  protected static String PRINCIPAL;
089
090  protected String krbKeytab;
091  protected String krbPrincipal;
092  protected UserGroupInformation ugi;
093  protected Configuration clientConf;
094  protected Configuration serverConf;
095
096  protected static void initKDCAndConf() throws Exception {
097    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
098    PRINCIPAL = "hbase/" + HOST;
099    KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
100    HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
101    // set a smaller timeout and retry to speed up tests
102    TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000);
103    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1);
104    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 100);
105  }
106
107  protected static void stopKDC() {
108    if (KDC != null) {
109      KDC.stop();
110    }
111  }
112
113  protected final void setUpPrincipalAndConf() throws Exception {
114    krbKeytab = getKeytabFileForTesting();
115    krbPrincipal = getPrincipalForTesting();
116    ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
117    clientConf = new Configuration(TEST_UTIL.getConfiguration());
118    setSecuredConfiguration(clientConf);
119    serverConf = new Configuration(TEST_UTIL.getConfiguration());
120    setSecuredConfiguration(serverConf);
121  }
122
123  @Test
124  public void testRpcCallWithEnabledKerberosSaslAuth() throws Exception {
125    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
126
127    // check that the login user is okay:
128    assertSame(ugi2, ugi);
129    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
130    assertEquals(krbPrincipal, ugi.getUserName());
131
132    callRpcService(User.create(ugi2));
133  }
134
135  @Test
136  public void testRpcCallWithEnabledKerberosSaslAuthCanonicalHostname() throws Exception {
137    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
138
139    // check that the login user is okay:
140    assertSame(ugi2, ugi);
141    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
142    assertEquals(krbPrincipal, ugi.getUserName());
143
144    enableCanonicalHostnameTesting(clientConf, "localhost");
145    clientConf.setBoolean(
146      SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, false);
147    clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
148
149    callRpcService(User.create(ugi2));
150  }
151
152  @Test
153  public void testRpcCallWithEnabledKerberosSaslAuthNoCanonicalHostname() throws Exception {
154    UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
155
156    // check that the login user is okay:
157    assertSame(ugi2, ugi);
158    assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
159    assertEquals(krbPrincipal, ugi.getUserName());
160
161    enableCanonicalHostnameTesting(clientConf, "127.0.0.1");
162    clientConf
163      .setBoolean(SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, true);
164    clientConf.set(HBaseKerberosUtils.KRB_PRINCIPAL, "hbase/_HOST@" + KDC.getRealm());
165
166    callRpcService(User.create(ugi2));
167  }
168
169  private static void enableCanonicalHostnameTesting(Configuration conf, String canonicalHostname) {
170    conf.setClass(SELECTOR_KEY, CanonicalHostnameTestingAuthenticationProviderSelector.class,
171      AuthenticationProviderSelector.class);
172    conf.set(CanonicalHostnameTestingAuthenticationProviderSelector.CANONICAL_HOST_NAME_KEY,
173      canonicalHostname);
174  }
175
176  public static class CanonicalHostnameTestingAuthenticationProviderSelector
177    extends BuiltInProviderSelector {
178    private static final String CANONICAL_HOST_NAME_KEY =
179      "CanonicalHostnameTestingAuthenticationProviderSelector.canonicalHostName";
180
181    @Override
182    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
183      selectProvider(String clusterId, User user) {
184      final Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> pair =
185        super.selectProvider(clusterId, user);
186      pair.setFirst(createCanonicalHostNameTestingProvider(pair.getFirst()));
187      return pair;
188    }
189
190    SaslClientAuthenticationProvider
191      createCanonicalHostNameTestingProvider(SaslClientAuthenticationProvider delegate) {
192      return new SaslClientAuthenticationProvider() {
193        @Override
194        public SaslClient createClient(Configuration conf, InetAddress serverAddr,
195          String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
196          Map<String, String> saslProps) throws IOException {
197          final String s = conf.get(CANONICAL_HOST_NAME_KEY);
198          if (s != null) {
199            try {
200              final Field canonicalHostName =
201                InetAddress.class.getDeclaredField("canonicalHostName");
202              canonicalHostName.setAccessible(true);
203              canonicalHostName.set(serverAddr, s);
204            } catch (NoSuchFieldException | IllegalAccessException e) {
205              throw new RuntimeException(e);
206            }
207          }
208
209          return delegate.createClient(conf, serverAddr, serverPrincipal, token, fallbackAllowed,
210            saslProps);
211        }
212
213        @Override
214        public UserInformation getUserInfo(User user) {
215          return delegate.getUserInfo(user);
216        }
217
218        @Override
219        public UserGroupInformation getRealUser(User ugi) {
220          return delegate.getRealUser(ugi);
221        }
222
223        @Override
224        public boolean canRetry() {
225          return delegate.canRetry();
226        }
227
228        @Override
229        public void relogin() throws IOException {
230          delegate.relogin();
231        }
232
233        @Override
234        public SaslAuthMethod getSaslAuthMethod() {
235          return delegate.getSaslAuthMethod();
236        }
237
238        @Override
239        public String getTokenKind() {
240          return delegate.getTokenKind();
241        }
242      };
243    }
244  }
245
246  @Test
247  public void testRpcServerFallbackToSimpleAuth() throws Exception {
248    String clientUsername = "testuser";
249    UserGroupInformation clientUgi =
250      UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
251
252    // check that the client user is insecure
253    assertNotSame(ugi, clientUgi);
254    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
255    assertEquals(clientUsername, clientUgi.getUserName());
256
257    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
258    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, true);
259    callRpcService(User.create(clientUgi));
260  }
261
262  @Test
263  public void testRpcServerDisallowFallbackToSimpleAuth() throws Exception {
264    String clientUsername = "testuser";
265    UserGroupInformation clientUgi =
266      UserGroupInformation.createUserForTesting(clientUsername, new String[] { clientUsername });
267
268    // check that the client user is insecure
269    assertNotSame(ugi, clientUgi);
270    assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
271    assertEquals(clientUsername, clientUgi.getUserName());
272
273    clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
274    serverConf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
275    IOException error =
276      assertThrows(IOException.class, () -> callRpcService(User.create(clientUgi)));
277    // server just closes the connection, so we could get broken pipe, or EOF, or connection closed
278    if (error.getMessage() == null || !error.getMessage().contains("Broken pipe")) {
279      assertThat(error,
280        either(instanceOf(EOFException.class)).or(instanceOf(ConnectionClosedException.class)));
281    }
282  }
283
284  @Test
285  public void testRpcClientFallbackToSimpleAuth() throws Exception {
286    String serverUsername = "testuser";
287    UserGroupInformation serverUgi =
288      UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
289    // check that the server user is insecure
290    assertNotSame(ugi, serverUgi);
291    assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
292    assertEquals(serverUsername, serverUgi.getUserName());
293
294    serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
295    clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true);
296    callRpcService(User.create(serverUgi), User.create(ugi));
297  }
298
299  @Test
300  public void testRpcClientDisallowFallbackToSimpleAuth() throws Exception {
301    String serverUsername = "testuser";
302    UserGroupInformation serverUgi =
303      UserGroupInformation.createUserForTesting(serverUsername, new String[] { serverUsername });
304    // check that the server user is insecure
305    assertNotSame(ugi, serverUgi);
306    assertEquals(AuthenticationMethod.SIMPLE, serverUgi.getAuthenticationMethod());
307    assertEquals(serverUsername, serverUgi.getUserName());
308
309    serverConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
310    clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, false);
311    assertThrows(FallbackDisallowedException.class,
312      () -> callRpcService(User.create(serverUgi), User.create(ugi)));
313  }
314
315  private void setRpcProtection(String clientProtection, String serverProtection) {
316    clientConf.set("hbase.rpc.protection", clientProtection);
317    serverConf.set("hbase.rpc.protection", serverProtection);
318  }
319
320  /**
321   * Test various combinations of Server and Client qops.
322   */
323  @Test
324  public void testSaslWithCommonQop() throws Exception {
325    setRpcProtection("privacy,authentication", "authentication");
326    callRpcService();
327
328    setRpcProtection("authentication", "privacy,authentication");
329    callRpcService();
330
331    setRpcProtection("integrity,authentication", "privacy,authentication");
332    callRpcService();
333
334    setRpcProtection("integrity,authentication", "integrity,authentication");
335    callRpcService();
336
337    setRpcProtection("privacy,authentication", "privacy,authentication");
338    callRpcService();
339  }
340
341  @Test
342  public void testSaslNoCommonQop() throws Exception {
343    setRpcProtection("integrity", "privacy");
344    SaslException se = assertThrows(SaslException.class, () -> callRpcService());
345    assertEquals("No common protection layer between client and server", se.getMessage());
346  }
347
348  /**
349   * Test sasl encryption with Crypto AES.
350   */
351  @Test
352  public void testSaslWithCryptoAES() throws Exception {
353    setRpcProtection("privacy", "privacy");
354    setCryptoAES("true", "true");
355    callRpcService();
356  }
357
358  /**
359   * Test various combinations of Server and Client configuration for Crypto AES.
360   */
361  @Test
362  public void testDifferentConfWithCryptoAES() throws Exception {
363    setRpcProtection("privacy", "privacy");
364
365    setCryptoAES("false", "true");
366    callRpcService();
367
368    setCryptoAES("true", "false");
369    try {
370      callRpcService();
371      fail("The exception should be thrown out for the rpc timeout.");
372    } catch (Exception e) {
373      // ignore the expected exception
374    }
375  }
376
377  private void setCryptoAES(String clientCryptoAES, String serverCryptoAES) {
378    clientConf.set("hbase.rpc.crypto.encryption.aes.enabled", clientCryptoAES);
379    serverConf.set("hbase.rpc.crypto.encryption.aes.enabled", serverCryptoAES);
380  }
381
382  /**
383   * Sets up a RPC Server and a Client. Does a RPC checks the result. If an exception is thrown from
384   * the stub, this function will throw root cause of that exception.
385   */
386  private void callRpcService(User serverUser, User clientUser) throws Exception {
387    SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
388    Mockito.when(securityInfoMock.getServerPrincipals())
389      .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL));
390    SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
391
392    InetSocketAddress isa = new InetSocketAddress(HOST, 0);
393
394    RpcServer rpcServer = serverUser.getUGI()
395      .doAs((PrivilegedExceptionAction<
396        RpcServer>) () -> RpcServerFactory.createRpcServer(null, "AbstractTestSecureIPC",
397          Lists.newArrayList(
398            new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)),
399          isa, serverConf, new FifoRpcScheduler(serverConf, 1)));
400    rpcServer.start();
401    try (RpcClient rpcClient =
402      RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) {
403      BlockingInterface stub =
404        newBlockingStub(rpcClient, rpcServer.getListenerAddress(), clientUser);
405      TestThread th1 = new TestThread(stub);
406      final Throwable exception[] = new Throwable[1];
407      Collections.synchronizedList(new ArrayList<Throwable>());
408      Thread.UncaughtExceptionHandler exceptionHandler = new Thread.UncaughtExceptionHandler() {
409        @Override
410        public void uncaughtException(Thread th, Throwable ex) {
411          exception[0] = ex;
412        }
413      };
414      th1.setUncaughtExceptionHandler(exceptionHandler);
415      th1.start();
416      th1.join();
417      if (exception[0] != null) {
418        // throw root cause.
419        while (exception[0].getCause() != null) {
420          exception[0] = exception[0].getCause();
421        }
422        throw (Exception) exception[0];
423      }
424    } finally {
425      rpcServer.stop();
426    }
427  }
428
429  private void callRpcService(User clientUser) throws Exception {
430    callRpcService(User.create(ugi), clientUser);
431  }
432
433  private void callRpcService() throws Exception {
434    callRpcService(User.create(ugi));
435  }
436
437  public static class TestThread extends Thread {
438    private final BlockingInterface stub;
439
440    public TestThread(BlockingInterface stub) {
441      this.stub = stub;
442    }
443
444    @Override
445    public void run() {
446      try {
447        int[] messageSize = new int[] { 100, 1000, 10000 };
448        for (int i = 0; i < messageSize.length; i++) {
449          String input = RandomStringUtils.random(messageSize[i]);
450          String result =
451            stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(input).build())
452              .getMessage();
453          assertEquals(input, result);
454        }
455      } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
456        throw new RuntimeException(e);
457      }
458    }
459  }
460}