001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.provider;
019
020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.ByteArrayInputStream;
027import java.io.DataInput;
028import java.io.DataInputStream;
029import java.io.DataOutput;
030import java.io.File;
031import java.io.IOException;
032import java.net.InetAddress;
033import java.security.PrivilegedExceptionAction;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.List;
037import java.util.Map;
038import java.util.Optional;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.stream.Collectors;
041import javax.security.auth.callback.Callback;
042import javax.security.auth.callback.CallbackHandler;
043import javax.security.auth.callback.NameCallback;
044import javax.security.auth.callback.PasswordCallback;
045import javax.security.auth.callback.UnsupportedCallbackException;
046import javax.security.sasl.AuthorizeCallback;
047import javax.security.sasl.RealmCallback;
048import javax.security.sasl.RealmChoiceCallback;
049import javax.security.sasl.Sasl;
050import javax.security.sasl.SaslClient;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.Cell;
054import org.apache.hadoop.hbase.CellUtil;
055import org.apache.hadoop.hbase.HBaseTestingUtility;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.LocalHBaseCluster;
058import org.apache.hadoop.hbase.TableName;
059import org.apache.hadoop.hbase.TableNameTestRule;
060import org.apache.hadoop.hbase.client.Admin;
061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
062import org.apache.hadoop.hbase.client.Connection;
063import org.apache.hadoop.hbase.client.ConnectionFactory;
064import org.apache.hadoop.hbase.client.Get;
065import org.apache.hadoop.hbase.client.Put;
066import org.apache.hadoop.hbase.client.Result;
067import org.apache.hadoop.hbase.client.RetriesExhaustedException;
068import org.apache.hadoop.hbase.client.Table;
069import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
070import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
071import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
072import org.apache.hadoop.hbase.ipc.NettyRpcClient;
073import org.apache.hadoop.hbase.ipc.RpcClientFactory;
074import org.apache.hadoop.hbase.ipc.RpcServerFactory;
075import org.apache.hadoop.hbase.security.AccessDeniedException;
076import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
077import org.apache.hadoop.hbase.security.SaslUtil;
078import org.apache.hadoop.hbase.security.User;
079import org.apache.hadoop.hbase.security.token.SecureTestCluster;
080import org.apache.hadoop.hbase.security.token.TokenProvider;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.hadoop.hbase.util.CommonFSUtils;
083import org.apache.hadoop.hbase.util.Pair;
084import org.apache.hadoop.io.Text;
085import org.apache.hadoop.io.WritableUtils;
086import org.apache.hadoop.minikdc.MiniKdc;
087import org.apache.hadoop.security.UserGroupInformation;
088import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
089import org.apache.hadoop.security.token.SecretManager;
090import org.apache.hadoop.security.token.SecretManager.InvalidToken;
091import org.apache.hadoop.security.token.Token;
092import org.apache.hadoop.security.token.TokenIdentifier;
093import org.junit.After;
094import org.junit.AfterClass;
095import org.junit.Before;
096import org.junit.Rule;
097import org.junit.Test;
098import org.junit.runners.Parameterized.Parameter;
099import org.junit.runners.Parameterized.Parameters;
100import org.slf4j.Logger;
101import org.slf4j.LoggerFactory;
102
103import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
104
105import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
106
107/**
108 * Tests the pluggable authentication framework with SASL using a contrived authentication system.
109 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the
110 * client Hadoop configuration. The servers validate this password via the "user database".
111 */
112public abstract class CustomSaslAuthenticationProviderTestBase {
113
114  private static final Logger LOG =
115    LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class);
116
117  private static final Map<String, String> USER_DATABASE = createUserDatabase();
118
119  private static final String USER1_PASSWORD = "foobarbaz";
120  private static final String USER2_PASSWORD = "bazbarfoo";
121
122  @Parameters
123  public static Collection<Object[]> parameters() {
124    return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() },
125      new Object[] { NettyRpcClient.class.getName() });
126  }
127
128  @Parameter
129  public String rpcClientImpl;
130
131  private static Map<String, String> createUserDatabase() {
132    Map<String, String> db = new ConcurrentHashMap<>();
133    db.put("user1", USER1_PASSWORD);
134    db.put("user2", USER2_PASSWORD);
135    return db;
136  }
137
138  public static String getPassword(String user) {
139    String password = USER_DATABASE.get(user);
140    if (password == null) {
141      throw new IllegalStateException("Cannot request password for a user that doesn't exist");
142    }
143    return password;
144  }
145
146  /**
147   * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used
148   * for delegation tokens.
149   */
150  public static class PasswordAuthTokenIdentifier extends TokenIdentifier {
151    public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN");
152    private String username;
153
154    public PasswordAuthTokenIdentifier() {
155    }
156
157    public PasswordAuthTokenIdentifier(String username) {
158      this.username = username;
159    }
160
161    @Override
162    public void readFields(DataInput in) throws IOException {
163      this.username = WritableUtils.readString(in);
164    }
165
166    @Override
167    public void write(DataOutput out) throws IOException {
168      WritableUtils.writeString(out, username);
169    }
170
171    @Override
172    public Text getKind() {
173      return PASSWORD_AUTH_TOKEN;
174    }
175
176    @Override
177    public UserGroupInformation getUser() {
178      if (username == null || "".equals(username)) {
179        return null;
180      }
181      return UserGroupInformation.createRemoteUser(username);
182    }
183  }
184
185  public static Token<? extends TokenIdentifier> createPasswordToken(String username,
186    String password, String clusterId) {
187    PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username);
188    Token<? extends TokenIdentifier> token =
189      new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId));
190    return token;
191  }
192
193  /**
194   * Client provider that finds custom Token in the user's UGI and authenticates with the server via
195   * DIGEST-MD5 using that password.
196   */
197  public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider {
198    public static final String MECHANISM = "DIGEST-MD5";
199    public static final SaslAuthMethod SASL_AUTH_METHOD =
200      new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN);
201
202    @Override
203    public SaslClient createClient(Configuration conf, InetAddress serverAddr,
204      String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed,
205      Map<String, String> saslProps) throws IOException {
206      return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
207        SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));
208    }
209
210    public Optional<Token<? extends TokenIdentifier>> findToken(User user) {
211      List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream()
212        .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN))
213        .collect(Collectors.toList());
214      if (tokens.isEmpty()) {
215        return Optional.empty();
216      }
217      if (tokens.size() > 1) {
218        throw new IllegalStateException("Cannot handle more than one PasswordAuthToken");
219      }
220      return Optional.of(tokens.get(0));
221    }
222
223    @Override
224    public SaslAuthMethod getSaslAuthMethod() {
225      return SASL_AUTH_METHOD;
226    }
227
228    /**
229     * Sasl CallbackHandler which extracts information from our custom token and places it into the
230     * Sasl objects.
231     */
232    public class InMemoryClientProviderCallbackHandler implements CallbackHandler {
233      private final Token<? extends TokenIdentifier> token;
234
235      public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) {
236        this.token = token;
237      }
238
239      @Override
240      public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
241        NameCallback nc = null;
242        PasswordCallback pc = null;
243        RealmCallback rc = null;
244        for (Callback callback : callbacks) {
245          if (callback instanceof RealmChoiceCallback) {
246            continue;
247          } else if (callback instanceof NameCallback) {
248            nc = (NameCallback) callback;
249          } else if (callback instanceof PasswordCallback) {
250            pc = (PasswordCallback) callback;
251          } else if (callback instanceof RealmCallback) {
252            rc = (RealmCallback) callback;
253          } else {
254            throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
255          }
256        }
257        if (nc != null) {
258          nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier()));
259        }
260        if (pc != null) {
261          pc.setPassword(SaslUtil.encodePassword(token.getPassword()));
262        }
263        if (rc != null) {
264          rc.setText(rc.getDefaultText());
265        }
266      }
267    }
268
269    @Override
270    public UserInformation getUserInfo(User user) {
271      return null;
272    }
273  }
274
275  /**
276   * Server provider which validates credentials from an in-memory database.
277   */
278  public static class InMemoryServerProvider extends InMemoryClientProvider
279    implements SaslServerAuthenticationProvider {
280
281    @Override
282    public AttemptingUserProvidingSaslServer
283      createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps)
284        throws IOException {
285      return new AttemptingUserProvidingSaslServer(
286        Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null,
287          SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()),
288        () -> null);
289    }
290
291    /**
292     * Pulls the correct password for the user who started the SASL handshake so that SASL can
293     * validate that the user provided the right password.
294     */
295    private class InMemoryServerProviderCallbackHandler implements CallbackHandler {
296
297      @Override
298      public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException {
299        NameCallback nc = null;
300        PasswordCallback pc = null;
301        AuthorizeCallback ac = null;
302        for (Callback callback : callbacks) {
303          if (callback instanceof AuthorizeCallback) {
304            ac = (AuthorizeCallback) callback;
305          } else if (callback instanceof NameCallback) {
306            nc = (NameCallback) callback;
307          } else if (callback instanceof PasswordCallback) {
308            pc = (PasswordCallback) callback;
309          } else if (callback instanceof RealmCallback) {
310            continue; // realm is ignored
311          } else {
312            throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback");
313          }
314        }
315        if (nc != null && pc != null) {
316          byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName());
317          PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier();
318          try {
319            id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
320          } catch (IOException e) {
321            throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier")
322              .initCause(e);
323          }
324          char[] actualPassword =
325            SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName())));
326          pc.setPassword(actualPassword);
327        }
328        if (ac != null) {
329          String authid = ac.getAuthenticationID();
330          String authzid = ac.getAuthorizationID();
331          if (authid.equals(authzid)) {
332            ac.setAuthorized(true);
333          } else {
334            ac.setAuthorized(false);
335          }
336          if (ac.isAuthorized()) {
337            ac.setAuthorizedID(authzid);
338          }
339        }
340      }
341    }
342
343    @Override
344    public boolean supportsProtocolAuthentication() {
345      return false;
346    }
347
348    @Override
349    public UserGroupInformation getAuthorizedUgi(String authzId,
350      SecretManager<TokenIdentifier> secretManager) throws IOException {
351      UserGroupInformation authorizedUgi;
352      byte[] encodedId = SaslUtil.decodeIdentifier(authzId);
353      PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier();
354      try {
355        tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId)));
356      } catch (IOException e) {
357        throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e);
358      }
359      authorizedUgi = tokenId.getUser();
360      if (authorizedUgi == null) {
361        throw new AccessDeniedException("Can't retrieve username from tokenIdentifier.");
362      }
363      authorizedUgi.addTokenIdentifier(tokenId);
364      authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod());
365      return authorizedUgi;
366    }
367  }
368
369  /**
370   * Custom provider which can select our custom provider, amongst other tokens which may be
371   * available.
372   */
373  public static class InMemoryProviderSelector extends BuiltInProviderSelector {
374    private InMemoryClientProvider inMemoryProvider;
375
376    @Override
377    public void configure(Configuration conf,
378      Collection<SaslClientAuthenticationProvider> providers) {
379      super.configure(conf, providers);
380      Optional<SaslClientAuthenticationProvider> o =
381        providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny();
382
383      inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException(
384        "InMemoryClientProvider not found in available providers: " + providers));
385    }
386
387    @Override
388    public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>>
389      selectProvider(String clusterId, User user) {
390      Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair =
391        super.selectProvider(clusterId, user);
392
393      Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user);
394      if (optional.isPresent()) {
395        LOG.info("Using InMemoryClientProvider");
396        return new Pair<>(inMemoryProvider, optional.get());
397      }
398
399      LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair);
400      return superPair;
401    }
402  }
403
404  private static void createBaseCluster(HBaseTestingUtility util, File keytabFile, MiniKdc kdc)
405    throws Exception {
406    String servicePrincipal = "hbase/localhost";
407    String spnegoPrincipal = "HTTP/localhost";
408    kdc.createPrincipal(keytabFile, servicePrincipal);
409    util.startMiniZKCluster();
410
411    HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(),
412      servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm());
413    HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class);
414
415    util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
416      TokenProvider.class.getName());
417    util.startMiniDFSCluster(1);
418    Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider");
419    CommonFSUtils.setRootDir(util.getConfiguration(), rootdir);
420  }
421
422  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
423  private static final Configuration CONF = UTIL.getConfiguration();
424  private static LocalHBaseCluster CLUSTER;
425  private static File KEYTAB_FILE;
426
427  protected static void startCluster(String rpcServerImpl) throws Exception {
428    KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath());
429    final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE);
430
431    // Adds our test impls instead of creating service loader entries which
432    // might inadvertently get them loaded on a real cluster.
433    CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY,
434      InMemoryClientProvider.class.getName());
435    CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY,
436      InMemoryServerProvider.class.getName());
437    CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY,
438      InMemoryProviderSelector.class.getName());
439    CONF.setLong(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 600);
440    createBaseCluster(UTIL, KEYTAB_FILE, kdc);
441    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl);
442    CLUSTER = new LocalHBaseCluster(CONF, 1);
443    CLUSTER.startup();
444  }
445
446  @AfterClass
447  public static void shutdownCluster() throws Exception {
448    if (CLUSTER != null) {
449      CLUSTER.shutdown();
450      CLUSTER = null;
451    }
452    UTIL.shutdownMiniDFSCluster();
453    UTIL.shutdownMiniZKCluster();
454    UTIL.cleanupTestDir();
455  }
456
457  @Before
458  public void setUp() throws Exception {
459    createTable();
460  }
461
462  @After
463  public void tearDown() throws IOException {
464    UTIL.deleteTable(name.getTableName());
465  }
466
467  @Rule
468  public TableNameTestRule name = new TableNameTestRule();
469
470  private TableName tableName;
471
472  private String clusterId;
473
474  private void createTable() throws Exception {
475    tableName = name.getTableName();
476
477    // Create a table and write a record as the service user (hbase)
478    UserGroupInformation serviceUgi = UserGroupInformation
479      .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath());
480    clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
481      @Override
482      public String run() throws Exception {
483        try (Connection conn = ConnectionFactory.createConnection(CONF);
484          Admin admin = conn.getAdmin();) {
485          admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
486            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build());
487
488          UTIL.waitTableAvailable(tableName);
489
490          try (Table t = conn.getTable(tableName)) {
491            Put p = new Put(Bytes.toBytes("r1"));
492            p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
493            t.put(p);
494          }
495
496          return admin.getClusterMetrics().getClusterId();
497        }
498      }
499    });
500    assertNotNull(clusterId);
501  }
502
503  private Configuration getClientConf() {
504    Configuration conf = new Configuration(CONF);
505    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl);
506    return conf;
507  }
508
509  @Test
510  public void testPositiveAuthentication() throws Exception {
511    // Validate that we can read that record back out as the user with our custom auth'n
512    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
513    user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId));
514    user1.doAs(new PrivilegedExceptionAction<Void>() {
515      @Override
516      public Void run() throws Exception {
517        try (Connection conn = ConnectionFactory.createConnection(getClientConf());
518          Table t = conn.getTable(tableName)) {
519          Result r = t.get(new Get(Bytes.toBytes("r1")));
520          assertNotNull(r);
521          assertFalse("Should have read a non-empty Result", r.isEmpty());
522          final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1"));
523          assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1")));
524
525          return null;
526        }
527      }
528    });
529  }
530
531  @Test
532  public void testNegativeAuthentication() throws Exception {
533    // Validate that we can read that record back out as the user with our custom auth'n
534    final Configuration clientConf = new Configuration(CONF);
535    // This test does not work with master registry in branch-2 because of a nuance in the non-async
536    // connection implementation. See the detail below.
537    clientConf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
538      HConstants.ZK_CONNECTION_REGISTRY_CLASS);
539    clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
540    UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]);
541    user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId));
542    user1.doAs(new PrivilegedExceptionAction<Void>() {
543      @Override
544      public Void run() throws Exception {
545        // There is a slight behavioral difference here in the 3.x vs 2.x branches. 3.x branches
546        // use async client connection implementation which throws if there is an exception when
547        // fetching the clusterId(). 2.x branches that use non-async client falls back to using a
548        // DEFAULT cluster ID in such cases. 3.x behavior makes more sense, especially if the
549        // exception is of type InvalidToken (digest mis-match), however I did not want to fix it
550        // since it makes sense only when master registry is in use (which has RPCs to master).
551        // That is the reason if you see a slight difference in the test between 3.x and 2.x.
552        try (Connection conn = ConnectionFactory.createConnection(clientConf);
553          Table t = conn.getTable(tableName)) {
554          t.get(new Get(Bytes.toBytes("r1")));
555          fail("Should not successfully authenticate with HBase");
556        } catch (RetriesExhaustedException re) {
557          assertTrue(re.getMessage(), re.getMessage().contains("SaslException"));
558        } catch (Exception e) {
559          // Any other exception is unexpected.
560          fail("Unexpected exception caught, was expecting a authentication error: "
561            + Throwables.getStackTraceAsString(e));
562        }
563        return null;
564      }
565    });
566  }
567}