001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.containsString; 022import static org.hamcrest.Matchers.either; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertThrows; 026 027import java.io.File; 028import java.io.IOException; 029import java.lang.reflect.UndeclaredThrowableException; 030import java.net.InetSocketAddress; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import javax.security.sasl.SaslException; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.CommonConfigurationKeys; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; 043import org.apache.hadoop.hbase.security.SecurityInfo; 044import org.apache.hadoop.hbase.security.User; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.SecurityTests; 047import org.apache.hadoop.ipc.RemoteException; 048import org.apache.hadoop.minikdc.MiniKdc; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057import org.junit.runner.RunWith; 058import org.junit.runners.Parameterized; 059import org.junit.runners.Parameterized.Parameter; 060import org.junit.runners.Parameterized.Parameters; 061 062import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 063import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 064import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 065import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 066import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 067import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException; 068 069import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; 070import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 071 072/** 073 * Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service. 074 * <p> 075 * Put here just because we need to visit some package private classes under this package. 076 */ 077@RunWith(Parameterized.class) 078@Category({ SecurityTests.class, MediumTests.class }) 079public class TestMultipleServerPrincipalsIPC { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestMultipleServerPrincipalsIPC.class); 084 085 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 086 087 private static final File KEYTAB_FILE = 088 new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); 089 090 private static MiniKdc KDC; 091 private static String HOST = "localhost"; 092 private static String SERVER_PRINCIPAL; 093 private static String SERVER_PRINCIPAL2; 094 private static String CLIENT_PRINCIPAL; 095 096 @Parameter(0) 097 public Class<? extends RpcServer> rpcServerImpl; 098 099 @Parameter(1) 100 public Class<? extends RpcClient> rpcClientImpl; 101 102 private Configuration clientConf; 103 private Configuration serverConf; 104 private UserGroupInformation clientUGI; 105 private UserGroupInformation serverUGI; 106 private RpcServer rpcServer; 107 private RpcClient rpcClient; 108 109 @Parameters(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}") 110 public static List<Object[]> params() { 111 List<Object[]> params = new ArrayList<>(); 112 List<Class<? extends RpcServer>> rpcServerImpls = 113 Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class); 114 List<Class<? extends RpcClient>> rpcClientImpls = 115 Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class); 116 for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) { 117 for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) { 118 params.add(new Object[] { rpcServerImpl, rpcClientImpl }); 119 } 120 } 121 return params; 122 } 123 124 @BeforeClass 125 public static void setUpBeforeClass() throws Exception { 126 KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); 127 SERVER_PRINCIPAL = "server/" + HOST; 128 SERVER_PRINCIPAL2 = "server2/" + HOST; 129 CLIENT_PRINCIPAL = "client"; 130 KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2); 131 // we need to use the full principal as we will compare them in this form in RpcConnection 132 // but for hadoop2's MiniKDC implementation, the createPrincipal method will always append realm 133 // at the end of the principal passed in, so we can not append the realm before calling 134 // KDC.createPrincipal 135 SERVER_PRINCIPAL += "@" + KDC.getRealm(); 136 SERVER_PRINCIPAL2 += "@" + KDC.getRealm(); 137 setSecuredConfiguration(TEST_UTIL.getConfiguration()); 138 TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1); 139 TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0); 140 TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10); 141 } 142 143 @AfterClass 144 public static void tearDownAfterClass() { 145 if (KDC != null) { 146 KDC.stop(); 147 } 148 } 149 150 private static void setSecuredConfiguration(Configuration conf) { 151 conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); 152 conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos"); 153 conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true); 154 } 155 156 private void loginAndStartRpcServer(String principal, int port) throws Exception { 157 UserGroupInformation.setConfiguration(serverConf); 158 serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 159 KEYTAB_FILE.getCanonicalPath()); 160 rpcServer = serverUGI.doAs((PrivilegedExceptionAction< 161 RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(), 162 Lists.newArrayList( 163 new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)), 164 new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1))); 165 rpcServer.start(); 166 } 167 168 @Before 169 public void setUp() throws Exception { 170 clientConf = new Configuration(TEST_UTIL.getConfiguration()); 171 clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl, 172 RpcClient.class); 173 String serverPrincipalConfigName = "hbase.test.multiple.principal.first"; 174 String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second"; 175 clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL); 176 clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2); 177 serverConf = new Configuration(TEST_UTIL.getConfiguration()); 178 serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, 179 RpcServer.class); 180 SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2, 181 serverPrincipalConfigName); 182 SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo); 183 184 UserGroupInformation.setConfiguration(clientConf); 185 clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, 186 KEYTAB_FILE.getCanonicalPath()); 187 loginAndStartRpcServer(SERVER_PRINCIPAL, 0); 188 rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory 189 .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())); 190 } 191 192 @After 193 public void tearDown() throws IOException { 194 Closeables.close(rpcClient, true); 195 rpcServer.stop(); 196 } 197 198 private String echo(String msg) throws Exception { 199 return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> { 200 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( 201 ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(), 202 10000); 203 TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel); 204 return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build()) 205 .getMessage(); 206 }); 207 } 208 209 @Test 210 public void testEcho() throws Exception { 211 String msg = "Hello World"; 212 assertEquals(msg, echo(msg)); 213 } 214 215 @Test 216 public void testMaliciousServer() throws Exception { 217 // reset the server principals so the principal returned by server does not match 218 SecurityInfo securityInfo = 219 SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName()); 220 for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) { 221 clientConf.set(securityInfo.getServerPrincipals().get(i), 222 "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm()); 223 } 224 UndeclaredThrowableException error = 225 assertThrows(UndeclaredThrowableException.class, () -> echo("whatever")); 226 assertThat(error.getCause(), instanceOf(ServiceException.class)); 227 assertThat(error.getCause().getCause(), instanceOf(SaslException.class)); 228 } 229 230 @Test 231 public void testRememberLastSucceededServerPrincipal() throws Exception { 232 // after this call we will remember the last succeeded server principal 233 assertEquals("a", echo("a")); 234 // shutdown the connection, but does not remove it from pool 235 RpcConnection conn = 236 Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values()); 237 conn.shutdown(); 238 // recreate rpc server with server principal2 239 int port = rpcServer.getListenerAddress().getPort(); 240 rpcServer.stop(); 241 serverUGI.logoutUserFromKeytab(); 242 loginAndStartRpcServer(SERVER_PRINCIPAL2, port); 243 // this time we will still use the remembered server principal, so we will get a sasl exception 244 UndeclaredThrowableException error = 245 assertThrows(UndeclaredThrowableException.class, () -> echo("a")); 246 assertThat(error.getCause(), instanceOf(ServiceException.class)); 247 // created by IPCUtil.wrap, to prepend the server address 248 assertThat(error.getCause().getCause(), instanceOf(IOException.class)); 249 // wraped IPCUtil.toIOE 250 assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class)); 251 Throwable cause = error.getCause().getCause().getCause().getCause(); 252 // for netty rpc client, it is DecoderException, for blocking rpc client, it is already 253 // RemoteExcetion 254 assertThat(cause, 255 either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class))); 256 RemoteException rme; 257 if (!(cause instanceof RemoteException)) { 258 assertThat(cause.getCause(), instanceOf(RemoteException.class)); 259 rme = (RemoteException) cause.getCause(); 260 } else { 261 rme = (RemoteException) cause; 262 } 263 assertEquals(SaslException.class.getName(), rme.getClassName()); 264 // the above failure will clear the remembered server principal, so this time we will get the 265 // correct one. We use retry here just because a failure of sasl negotiation will trigger a 266 // relogin and it may take some time, and for netty based implementation the relogin is async 267 TEST_UTIL.waitFor(10000, () -> { 268 try { 269 echo("a"); 270 } catch (UndeclaredThrowableException e) { 271 Throwable t = e.getCause().getCause(); 272 assertThat(t, instanceOf(IOException.class)); 273 if (!(t instanceof FailedServerException)) { 274 // for netty rpc client 275 assertThat(e.getCause().getMessage(), 276 containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS)); 277 } 278 return false; 279 } 280 return true; 281 }); 282 } 283}