001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.containsString;
022import static org.hamcrest.Matchers.either;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertThrows;
026
027import java.io.File;
028import java.io.IOException;
029import java.lang.reflect.UndeclaredThrowableException;
030import java.net.InetSocketAddress;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035import javax.security.sasl.SaslException;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.CommonConfigurationKeys;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
043import org.apache.hadoop.hbase.security.SecurityInfo;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.SecurityTests;
047import org.apache.hadoop.ipc.RemoteException;
048import org.apache.hadoop.minikdc.MiniKdc;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057import org.junit.runner.RunWith;
058import org.junit.runners.Parameterized;
059import org.junit.runners.Parameterized.Parameter;
060import org.junit.runners.Parameterized.Parameters;
061
062import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
063import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
064import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
065import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
066import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
067import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException;
068
069import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
070import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
071
072/**
073 * Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service.
074 * <p>
075 * Put here just because we need to visit some package private classes under this package.
076 */
077@RunWith(Parameterized.class)
078@Category({ SecurityTests.class, MediumTests.class })
079public class TestMultipleServerPrincipalsIPC {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestMultipleServerPrincipalsIPC.class);
084
085  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
086
087  private static final File KEYTAB_FILE =
088    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
089
090  private static MiniKdc KDC;
091  private static String HOST = "localhost";
092  private static String SERVER_PRINCIPAL;
093  private static String SERVER_PRINCIPAL2;
094  private static String CLIENT_PRINCIPAL;
095
096  @Parameter(0)
097  public Class<? extends RpcServer> rpcServerImpl;
098
099  @Parameter(1)
100  public Class<? extends RpcClient> rpcClientImpl;
101
102  private Configuration clientConf;
103  private Configuration serverConf;
104  private UserGroupInformation clientUGI;
105  private UserGroupInformation serverUGI;
106  private RpcServer rpcServer;
107  private RpcClient rpcClient;
108
109  @Parameters(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}")
110  public static List<Object[]> params() {
111    List<Object[]> params = new ArrayList<>();
112    List<Class<? extends RpcServer>> rpcServerImpls =
113      Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class);
114    List<Class<? extends RpcClient>> rpcClientImpls =
115      Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class);
116    for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) {
117      for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) {
118        params.add(new Object[] { rpcServerImpl, rpcClientImpl });
119      }
120    }
121    return params;
122  }
123
124  @BeforeClass
125  public static void setUpBeforeClass() throws Exception {
126    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
127    SERVER_PRINCIPAL = "server/" + HOST;
128    SERVER_PRINCIPAL2 = "server2/" + HOST;
129    CLIENT_PRINCIPAL = "client";
130    KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2);
131    // we need to use the full principal as we will compare them in this form in RpcConnection
132    // but for hadoop2's MiniKDC implementation, the createPrincipal method will always append realm
133    // at the end of the principal passed in, so we can not append the realm before calling
134    // KDC.createPrincipal
135    SERVER_PRINCIPAL += "@" + KDC.getRealm();
136    SERVER_PRINCIPAL2 += "@" + KDC.getRealm();
137    setSecuredConfiguration(TEST_UTIL.getConfiguration());
138    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1);
139    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0);
140    TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10);
141  }
142
143  @AfterClass
144  public static void tearDownAfterClass() {
145    if (KDC != null) {
146      KDC.stop();
147    }
148  }
149
150  private static void setSecuredConfiguration(Configuration conf) {
151    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
152    conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
153    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
154  }
155
156  private void loginAndStartRpcServer(String principal, int port) throws Exception {
157    UserGroupInformation.setConfiguration(serverConf);
158    serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,
159      KEYTAB_FILE.getCanonicalPath());
160    rpcServer = serverUGI.doAs((PrivilegedExceptionAction<
161      RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(),
162        Lists.newArrayList(
163          new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)),
164        new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1)));
165    rpcServer.start();
166  }
167
168  @Before
169  public void setUp() throws Exception {
170    clientConf = new Configuration(TEST_UTIL.getConfiguration());
171    clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl,
172      RpcClient.class);
173    String serverPrincipalConfigName = "hbase.test.multiple.principal.first";
174    String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second";
175    clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL);
176    clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2);
177    serverConf = new Configuration(TEST_UTIL.getConfiguration());
178    serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl,
179      RpcServer.class);
180    SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2,
181      serverPrincipalConfigName);
182    SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo);
183
184    UserGroupInformation.setConfiguration(clientConf);
185    clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL,
186      KEYTAB_FILE.getCanonicalPath());
187    loginAndStartRpcServer(SERVER_PRINCIPAL, 0);
188    rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory
189      .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString()));
190  }
191
192  @After
193  public void tearDown() throws IOException {
194    Closeables.close(rpcClient, true);
195    rpcServer.stop();
196  }
197
198  private String echo(String msg) throws Exception {
199    return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> {
200      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
201        ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(),
202        10000);
203      TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel);
204      return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build())
205        .getMessage();
206    });
207  }
208
209  @Test
210  public void testEcho() throws Exception {
211    String msg = "Hello World";
212    assertEquals(msg, echo(msg));
213  }
214
215  @Test
216  public void testMaliciousServer() throws Exception {
217    // reset the server principals so the principal returned by server does not match
218    SecurityInfo securityInfo =
219      SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName());
220    for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) {
221      clientConf.set(securityInfo.getServerPrincipals().get(i),
222        "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm());
223    }
224    UndeclaredThrowableException error =
225      assertThrows(UndeclaredThrowableException.class, () -> echo("whatever"));
226    assertThat(error.getCause(), instanceOf(ServiceException.class));
227    assertThat(error.getCause().getCause(), instanceOf(SaslException.class));
228  }
229
230  @Test
231  public void testRememberLastSucceededServerPrincipal() throws Exception {
232    // after this call we will remember the last succeeded server principal
233    assertEquals("a", echo("a"));
234    // shutdown the connection, but does not remove it from pool
235    RpcConnection conn =
236      Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values());
237    conn.shutdown();
238    // recreate rpc server with server principal2
239    int port = rpcServer.getListenerAddress().getPort();
240    rpcServer.stop();
241    serverUGI.logoutUserFromKeytab();
242    loginAndStartRpcServer(SERVER_PRINCIPAL2, port);
243    // this time we will still use the remembered server principal, so we will get a sasl exception
244    UndeclaredThrowableException error =
245      assertThrows(UndeclaredThrowableException.class, () -> echo("a"));
246    assertThat(error.getCause(), instanceOf(ServiceException.class));
247    // created by IPCUtil.wrap, to prepend the server address
248    assertThat(error.getCause().getCause(), instanceOf(IOException.class));
249    // wraped IPCUtil.toIOE
250    assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class));
251    Throwable cause = error.getCause().getCause().getCause().getCause();
252    // for netty rpc client, it is DecoderException, for blocking rpc client, it is already
253    // RemoteExcetion
254    assertThat(cause,
255      either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class)));
256    RemoteException rme;
257    if (!(cause instanceof RemoteException)) {
258      assertThat(cause.getCause(), instanceOf(RemoteException.class));
259      rme = (RemoteException) cause.getCause();
260    } else {
261      rme = (RemoteException) cause;
262    }
263    assertEquals(SaslException.class.getName(), rme.getClassName());
264    // the above failure will clear the remembered server principal, so this time we will get the
265    // correct one. We use retry here just because a failure of sasl negotiation will trigger a
266    // relogin and it may take some time, and for netty based implementation the relogin is async
267    TEST_UTIL.waitFor(10000, () -> {
268      try {
269        echo("a");
270      } catch (UndeclaredThrowableException e) {
271        Throwable t = e.getCause().getCause();
272        assertThat(t, instanceOf(IOException.class));
273        if (!(t instanceof FailedServerException)) {
274          // for netty rpc client
275          assertThat(e.getCause().getMessage(),
276            containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS));
277        }
278        return false;
279      }
280      return true;
281    });
282  }
283}