001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.provider; 019 020import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.ByteArrayInputStream; 027import java.io.DataInput; 028import java.io.DataInputStream; 029import java.io.DataOutput; 030import java.io.File; 031import java.io.IOException; 032import java.net.InetAddress; 033import java.security.PrivilegedExceptionAction; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.List; 037import java.util.Map; 038import java.util.Optional; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.stream.Collectors; 041import javax.security.auth.callback.Callback; 042import javax.security.auth.callback.CallbackHandler; 043import javax.security.auth.callback.NameCallback; 044import javax.security.auth.callback.PasswordCallback; 045import javax.security.auth.callback.UnsupportedCallbackException; 046import javax.security.sasl.AuthorizeCallback; 047import javax.security.sasl.RealmCallback; 048import javax.security.sasl.RealmChoiceCallback; 049import javax.security.sasl.Sasl; 050import javax.security.sasl.SaslClient; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.hbase.Cell; 054import org.apache.hadoop.hbase.CellUtil; 055import org.apache.hadoop.hbase.HBaseTestingUtility; 056import org.apache.hadoop.hbase.HConstants; 057import org.apache.hadoop.hbase.LocalHBaseCluster; 058import org.apache.hadoop.hbase.TableName; 059import org.apache.hadoop.hbase.TableNameTestRule; 060import org.apache.hadoop.hbase.client.Admin; 061import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 062import org.apache.hadoop.hbase.client.Connection; 063import org.apache.hadoop.hbase.client.ConnectionFactory; 064import org.apache.hadoop.hbase.client.Get; 065import org.apache.hadoop.hbase.client.Put; 066import org.apache.hadoop.hbase.client.Result; 067import org.apache.hadoop.hbase.client.RetriesExhaustedException; 068import org.apache.hadoop.hbase.client.Table; 069import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 070import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 071import org.apache.hadoop.hbase.ipc.BlockingRpcClient; 072import org.apache.hadoop.hbase.ipc.NettyRpcClient; 073import org.apache.hadoop.hbase.ipc.RpcClientFactory; 074import org.apache.hadoop.hbase.ipc.RpcServerFactory; 075import org.apache.hadoop.hbase.security.AccessDeniedException; 076import org.apache.hadoop.hbase.security.HBaseKerberosUtils; 077import org.apache.hadoop.hbase.security.SaslUtil; 078import org.apache.hadoop.hbase.security.User; 079import org.apache.hadoop.hbase.security.token.SecureTestCluster; 080import org.apache.hadoop.hbase.security.token.TokenProvider; 081import org.apache.hadoop.hbase.util.Bytes; 082import org.apache.hadoop.hbase.util.CommonFSUtils; 083import org.apache.hadoop.hbase.util.Pair; 084import org.apache.hadoop.io.Text; 085import org.apache.hadoop.io.WritableUtils; 086import org.apache.hadoop.minikdc.MiniKdc; 087import org.apache.hadoop.security.UserGroupInformation; 088import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; 089import org.apache.hadoop.security.token.SecretManager; 090import org.apache.hadoop.security.token.SecretManager.InvalidToken; 091import org.apache.hadoop.security.token.Token; 092import org.apache.hadoop.security.token.TokenIdentifier; 093import org.junit.After; 094import org.junit.AfterClass; 095import org.junit.Before; 096import org.junit.Rule; 097import org.junit.Test; 098import org.junit.runners.Parameterized.Parameter; 099import org.junit.runners.Parameterized.Parameters; 100import org.slf4j.Logger; 101import org.slf4j.LoggerFactory; 102 103import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 104 105import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; 106 107/** 108 * Tests the pluggable authentication framework with SASL using a contrived authentication system. 109 * This tests holds a "user database" in memory as a hashmap. Clients provide their password in the 110 * client Hadoop configuration. The servers validate this password via the "user database". 111 */ 112public abstract class CustomSaslAuthenticationProviderTestBase { 113 114 private static final Logger LOG = 115 LoggerFactory.getLogger(CustomSaslAuthenticationProviderTestBase.class); 116 117 private static final Map<String, String> USER_DATABASE = createUserDatabase(); 118 119 private static final String USER1_PASSWORD = "foobarbaz"; 120 private static final String USER2_PASSWORD = "bazbarfoo"; 121 122 @Parameters 123 public static Collection<Object[]> parameters() { 124 return Arrays.asList(new Object[] { BlockingRpcClient.class.getName() }, 125 new Object[] { NettyRpcClient.class.getName() }); 126 } 127 128 @Parameter 129 public String rpcClientImpl; 130 131 private static Map<String, String> createUserDatabase() { 132 Map<String, String> db = new ConcurrentHashMap<>(); 133 db.put("user1", USER1_PASSWORD); 134 db.put("user2", USER2_PASSWORD); 135 return db; 136 } 137 138 public static String getPassword(String user) { 139 String password = USER_DATABASE.get(user); 140 if (password == null) { 141 throw new IllegalStateException("Cannot request password for a user that doesn't exist"); 142 } 143 return password; 144 } 145 146 /** 147 * A custom token identifier for our custom auth'n method. Unique from the TokenIdentifier used 148 * for delegation tokens. 149 */ 150 public static class PasswordAuthTokenIdentifier extends TokenIdentifier { 151 public static final Text PASSWORD_AUTH_TOKEN = new Text("HBASE_PASSWORD_TEST_TOKEN"); 152 private String username; 153 154 public PasswordAuthTokenIdentifier() { 155 } 156 157 public PasswordAuthTokenIdentifier(String username) { 158 this.username = username; 159 } 160 161 @Override 162 public void readFields(DataInput in) throws IOException { 163 this.username = WritableUtils.readString(in); 164 } 165 166 @Override 167 public void write(DataOutput out) throws IOException { 168 WritableUtils.writeString(out, username); 169 } 170 171 @Override 172 public Text getKind() { 173 return PASSWORD_AUTH_TOKEN; 174 } 175 176 @Override 177 public UserGroupInformation getUser() { 178 if (username == null || "".equals(username)) { 179 return null; 180 } 181 return UserGroupInformation.createRemoteUser(username); 182 } 183 } 184 185 public static Token<? extends TokenIdentifier> createPasswordToken(String username, 186 String password, String clusterId) { 187 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(username); 188 Token<? extends TokenIdentifier> token = 189 new Token<>(id.getBytes(), Bytes.toBytes(password), id.getKind(), new Text(clusterId)); 190 return token; 191 } 192 193 /** 194 * Client provider that finds custom Token in the user's UGI and authenticates with the server via 195 * DIGEST-MD5 using that password. 196 */ 197 public static class InMemoryClientProvider extends AbstractSaslClientAuthenticationProvider { 198 public static final String MECHANISM = "DIGEST-MD5"; 199 public static final SaslAuthMethod SASL_AUTH_METHOD = 200 new SaslAuthMethod("IN_MEMORY", (byte) 42, MECHANISM, AuthenticationMethod.TOKEN); 201 202 @Override 203 public SaslClient createClient(Configuration conf, InetAddress serverAddr, 204 String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 205 Map<String, String> saslProps) throws IOException { 206 return Sasl.createSaslClient(new String[] { MECHANISM }, null, null, 207 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token)); 208 } 209 210 public Optional<Token<? extends TokenIdentifier>> findToken(User user) { 211 List<Token<? extends TokenIdentifier>> tokens = user.getTokens().stream() 212 .filter((token) -> token.getKind().equals(PasswordAuthTokenIdentifier.PASSWORD_AUTH_TOKEN)) 213 .collect(Collectors.toList()); 214 if (tokens.isEmpty()) { 215 return Optional.empty(); 216 } 217 if (tokens.size() > 1) { 218 throw new IllegalStateException("Cannot handle more than one PasswordAuthToken"); 219 } 220 return Optional.of(tokens.get(0)); 221 } 222 223 @Override 224 public SaslAuthMethod getSaslAuthMethod() { 225 return SASL_AUTH_METHOD; 226 } 227 228 /** 229 * Sasl CallbackHandler which extracts information from our custom token and places it into the 230 * Sasl objects. 231 */ 232 public class InMemoryClientProviderCallbackHandler implements CallbackHandler { 233 private final Token<? extends TokenIdentifier> token; 234 235 public InMemoryClientProviderCallbackHandler(Token<? extends TokenIdentifier> token) { 236 this.token = token; 237 } 238 239 @Override 240 public void handle(Callback[] callbacks) throws UnsupportedCallbackException { 241 NameCallback nc = null; 242 PasswordCallback pc = null; 243 RealmCallback rc = null; 244 for (Callback callback : callbacks) { 245 if (callback instanceof RealmChoiceCallback) { 246 continue; 247 } else if (callback instanceof NameCallback) { 248 nc = (NameCallback) callback; 249 } else if (callback instanceof PasswordCallback) { 250 pc = (PasswordCallback) callback; 251 } else if (callback instanceof RealmCallback) { 252 rc = (RealmCallback) callback; 253 } else { 254 throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); 255 } 256 } 257 if (nc != null) { 258 nc.setName(SaslUtil.encodeIdentifier(token.getIdentifier())); 259 } 260 if (pc != null) { 261 pc.setPassword(SaslUtil.encodePassword(token.getPassword())); 262 } 263 if (rc != null) { 264 rc.setText(rc.getDefaultText()); 265 } 266 } 267 } 268 269 @Override 270 public UserInformation getUserInfo(User user) { 271 return null; 272 } 273 } 274 275 /** 276 * Server provider which validates credentials from an in-memory database. 277 */ 278 public static class InMemoryServerProvider extends InMemoryClientProvider 279 implements SaslServerAuthenticationProvider { 280 281 @Override 282 public AttemptingUserProvidingSaslServer 283 createServer(SecretManager<TokenIdentifier> secretManager, Map<String, String> saslProps) 284 throws IOException { 285 return new AttemptingUserProvidingSaslServer( 286 Sasl.createSaslServer(getSaslAuthMethod().getSaslMechanism(), null, 287 SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryServerProviderCallbackHandler()), 288 () -> null); 289 } 290 291 /** 292 * Pulls the correct password for the user who started the SASL handshake so that SASL can 293 * validate that the user provided the right password. 294 */ 295 private class InMemoryServerProviderCallbackHandler implements CallbackHandler { 296 297 @Override 298 public void handle(Callback[] callbacks) throws InvalidToken, UnsupportedCallbackException { 299 NameCallback nc = null; 300 PasswordCallback pc = null; 301 AuthorizeCallback ac = null; 302 for (Callback callback : callbacks) { 303 if (callback instanceof AuthorizeCallback) { 304 ac = (AuthorizeCallback) callback; 305 } else if (callback instanceof NameCallback) { 306 nc = (NameCallback) callback; 307 } else if (callback instanceof PasswordCallback) { 308 pc = (PasswordCallback) callback; 309 } else if (callback instanceof RealmCallback) { 310 continue; // realm is ignored 311 } else { 312 throw new UnsupportedCallbackException(callback, "Unrecognized SASL Callback"); 313 } 314 } 315 if (nc != null && pc != null) { 316 byte[] encodedId = SaslUtil.decodeIdentifier(nc.getDefaultName()); 317 PasswordAuthTokenIdentifier id = new PasswordAuthTokenIdentifier(); 318 try { 319 id.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 320 } catch (IOException e) { 321 throw (InvalidToken) new InvalidToken("Can't de-serialize tokenIdentifier") 322 .initCause(e); 323 } 324 char[] actualPassword = 325 SaslUtil.encodePassword(Bytes.toBytes(getPassword(id.getUser().getUserName()))); 326 pc.setPassword(actualPassword); 327 } 328 if (ac != null) { 329 String authid = ac.getAuthenticationID(); 330 String authzid = ac.getAuthorizationID(); 331 if (authid.equals(authzid)) { 332 ac.setAuthorized(true); 333 } else { 334 ac.setAuthorized(false); 335 } 336 if (ac.isAuthorized()) { 337 ac.setAuthorizedID(authzid); 338 } 339 } 340 } 341 } 342 343 @Override 344 public boolean supportsProtocolAuthentication() { 345 return false; 346 } 347 348 @Override 349 public UserGroupInformation getAuthorizedUgi(String authzId, 350 SecretManager<TokenIdentifier> secretManager) throws IOException { 351 UserGroupInformation authorizedUgi; 352 byte[] encodedId = SaslUtil.decodeIdentifier(authzId); 353 PasswordAuthTokenIdentifier tokenId = new PasswordAuthTokenIdentifier(); 354 try { 355 tokenId.readFields(new DataInputStream(new ByteArrayInputStream(encodedId))); 356 } catch (IOException e) { 357 throw new IOException("Can't de-serialize PasswordAuthTokenIdentifier", e); 358 } 359 authorizedUgi = tokenId.getUser(); 360 if (authorizedUgi == null) { 361 throw new AccessDeniedException("Can't retrieve username from tokenIdentifier."); 362 } 363 authorizedUgi.addTokenIdentifier(tokenId); 364 authorizedUgi.setAuthenticationMethod(getSaslAuthMethod().getAuthMethod()); 365 return authorizedUgi; 366 } 367 } 368 369 /** 370 * Custom provider which can select our custom provider, amongst other tokens which may be 371 * available. 372 */ 373 public static class InMemoryProviderSelector extends BuiltInProviderSelector { 374 private InMemoryClientProvider inMemoryProvider; 375 376 @Override 377 public void configure(Configuration conf, 378 Collection<SaslClientAuthenticationProvider> providers) { 379 super.configure(conf, providers); 380 Optional<SaslClientAuthenticationProvider> o = 381 providers.stream().filter((p) -> p instanceof InMemoryClientProvider).findAny(); 382 383 inMemoryProvider = (InMemoryClientProvider) o.orElseThrow(() -> new RuntimeException( 384 "InMemoryClientProvider not found in available providers: " + providers)); 385 } 386 387 @Override 388 public Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> 389 selectProvider(String clusterId, User user) { 390 Pair<SaslClientAuthenticationProvider, Token<? extends TokenIdentifier>> superPair = 391 super.selectProvider(clusterId, user); 392 393 Optional<Token<? extends TokenIdentifier>> optional = inMemoryProvider.findToken(user); 394 if (optional.isPresent()) { 395 LOG.info("Using InMemoryClientProvider"); 396 return new Pair<>(inMemoryProvider, optional.get()); 397 } 398 399 LOG.info("InMemoryClientProvider not usable, falling back to {}", superPair); 400 return superPair; 401 } 402 } 403 404 private static void createBaseCluster(HBaseTestingUtility util, File keytabFile, MiniKdc kdc) 405 throws Exception { 406 String servicePrincipal = "hbase/localhost"; 407 String spnegoPrincipal = "HTTP/localhost"; 408 kdc.createPrincipal(keytabFile, servicePrincipal); 409 util.startMiniZKCluster(); 410 411 HBaseKerberosUtils.setSecuredConfiguration(util.getConfiguration(), 412 servicePrincipal + "@" + kdc.getRealm(), spnegoPrincipal + "@" + kdc.getRealm()); 413 HBaseKerberosUtils.setSSLConfiguration(util, SecureTestCluster.class); 414 415 util.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 416 TokenProvider.class.getName()); 417 util.startMiniDFSCluster(1); 418 Path rootdir = util.getDataTestDirOnTestFS("TestCustomSaslAuthenticationProvider"); 419 CommonFSUtils.setRootDir(util.getConfiguration(), rootdir); 420 } 421 422 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 423 private static final Configuration CONF = UTIL.getConfiguration(); 424 private static LocalHBaseCluster CLUSTER; 425 private static File KEYTAB_FILE; 426 427 protected static void startCluster(String rpcServerImpl) throws Exception { 428 KEYTAB_FILE = new File(UTIL.getDataTestDir("keytab").toUri().getPath()); 429 final MiniKdc kdc = UTIL.setupMiniKdc(KEYTAB_FILE); 430 431 // Adds our test impls instead of creating service loader entries which 432 // might inadvertently get them loaded on a real cluster. 433 CONF.setStrings(SaslClientAuthenticationProviders.EXTRA_PROVIDERS_KEY, 434 InMemoryClientProvider.class.getName()); 435 CONF.setStrings(SaslServerAuthenticationProviders.EXTRA_PROVIDERS_KEY, 436 InMemoryServerProvider.class.getName()); 437 CONF.set(SaslClientAuthenticationProviders.SELECTOR_KEY, 438 InMemoryProviderSelector.class.getName()); 439 CONF.setLong(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN, 600); 440 createBaseCluster(UTIL, KEYTAB_FILE, kdc); 441 CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl); 442 CLUSTER = new LocalHBaseCluster(CONF, 1); 443 CLUSTER.startup(); 444 } 445 446 @AfterClass 447 public static void shutdownCluster() throws Exception { 448 if (CLUSTER != null) { 449 CLUSTER.shutdown(); 450 CLUSTER = null; 451 } 452 UTIL.shutdownMiniDFSCluster(); 453 UTIL.shutdownMiniZKCluster(); 454 UTIL.cleanupTestDir(); 455 } 456 457 @Before 458 public void setUp() throws Exception { 459 createTable(); 460 } 461 462 @After 463 public void tearDown() throws IOException { 464 UTIL.deleteTable(name.getTableName()); 465 } 466 467 @Rule 468 public TableNameTestRule name = new TableNameTestRule(); 469 470 private TableName tableName; 471 472 private String clusterId; 473 474 private void createTable() throws Exception { 475 tableName = name.getTableName(); 476 477 // Create a table and write a record as the service user (hbase) 478 UserGroupInformation serviceUgi = UserGroupInformation 479 .loginUserFromKeytabAndReturnUGI("hbase/localhost", KEYTAB_FILE.getAbsolutePath()); 480 clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() { 481 @Override 482 public String run() throws Exception { 483 try (Connection conn = ConnectionFactory.createConnection(CONF); 484 Admin admin = conn.getAdmin();) { 485 admin.createTable(TableDescriptorBuilder.newBuilder(tableName) 486 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1")).build()); 487 488 UTIL.waitTableAvailable(tableName); 489 490 try (Table t = conn.getTable(tableName)) { 491 Put p = new Put(Bytes.toBytes("r1")); 492 p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1")); 493 t.put(p); 494 } 495 496 return admin.getClusterMetrics().getClusterId(); 497 } 498 } 499 }); 500 assertNotNull(clusterId); 501 } 502 503 private Configuration getClientConf() { 504 Configuration conf = new Configuration(CONF); 505 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl); 506 return conf; 507 } 508 509 @Test 510 public void testPositiveAuthentication() throws Exception { 511 // Validate that we can read that record back out as the user with our custom auth'n 512 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 513 user1.addToken(createPasswordToken("user1", USER1_PASSWORD, clusterId)); 514 user1.doAs(new PrivilegedExceptionAction<Void>() { 515 @Override 516 public Void run() throws Exception { 517 try (Connection conn = ConnectionFactory.createConnection(getClientConf()); 518 Table t = conn.getTable(tableName)) { 519 Result r = t.get(new Get(Bytes.toBytes("r1"))); 520 assertNotNull(r); 521 assertFalse("Should have read a non-empty Result", r.isEmpty()); 522 final Cell cell = r.getColumnLatestCell(Bytes.toBytes("f1"), Bytes.toBytes("q1")); 523 assertTrue("Unexpected value", CellUtil.matchingValue(cell, Bytes.toBytes("1"))); 524 525 return null; 526 } 527 } 528 }); 529 } 530 531 @Test 532 public void testNegativeAuthentication() throws Exception { 533 // Validate that we can read that record back out as the user with our custom auth'n 534 final Configuration clientConf = new Configuration(CONF); 535 // This test does not work with master registry in branch-2 because of a nuance in the non-async 536 // connection implementation. See the detail below. 537 clientConf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, 538 HConstants.ZK_CONNECTION_REGISTRY_CLASS); 539 clientConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 540 UserGroupInformation user1 = UserGroupInformation.createUserForTesting("user1", new String[0]); 541 user1.addToken(createPasswordToken("user1", "definitely not the password", clusterId)); 542 user1.doAs(new PrivilegedExceptionAction<Void>() { 543 @Override 544 public Void run() throws Exception { 545 // There is a slight behavioral difference here in the 3.x vs 2.x branches. 3.x branches 546 // use async client connection implementation which throws if there is an exception when 547 // fetching the clusterId(). 2.x branches that use non-async client falls back to using a 548 // DEFAULT cluster ID in such cases. 3.x behavior makes more sense, especially if the 549 // exception is of type InvalidToken (digest mis-match), however I did not want to fix it 550 // since it makes sense only when master registry is in use (which has RPCs to master). 551 // That is the reason if you see a slight difference in the test between 3.x and 2.x. 552 try (Connection conn = ConnectionFactory.createConnection(clientConf); 553 Table t = conn.getTable(tableName)) { 554 t.get(new Get(Bytes.toBytes("r1"))); 555 fail("Should not successfully authenticate with HBase"); 556 } catch (RetriesExhaustedException re) { 557 assertTrue(re.getMessage(), re.getMessage().contains("SaslException")); 558 } catch (Exception e) { 559 // Any other exception is unexpected. 560 fail("Unexpected exception caught, was expecting a authentication error: " 561 + Throwables.getStackTraceAsString(e)); 562 } 563 return null; 564 } 565 }); 566 } 567}