001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; 026import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId; 027import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; 028import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; 029import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub; 030import static org.hamcrest.MatcherAssert.assertThat; 031import static org.hamcrest.Matchers.allOf; 032import static org.hamcrest.Matchers.containsString; 033import static org.hamcrest.Matchers.everyItem; 034import static org.hamcrest.Matchers.greaterThanOrEqualTo; 035import static org.hamcrest.Matchers.hasItem; 036import static org.hamcrest.Matchers.instanceOf; 037import static org.hamcrest.Matchers.startsWith; 038import static org.junit.Assert.assertEquals; 039import static org.junit.Assert.assertFalse; 040import static org.junit.Assert.assertNotNull; 041import static org.junit.Assert.assertNull; 042import static org.junit.Assert.assertThrows; 043import static org.junit.Assert.assertTrue; 044import static org.junit.Assert.fail; 045import static org.mockito.ArgumentMatchers.any; 046import static org.mockito.Mockito.mock; 047import static org.mockito.Mockito.spy; 048import static org.mockito.Mockito.verify; 049import static org.mockito.Mockito.when; 050import static org.mockito.internal.verification.VerificationModeFactory.times; 051 052import io.opentelemetry.api.common.AttributeKey; 053import io.opentelemetry.api.trace.SpanKind; 054import io.opentelemetry.api.trace.StatusCode; 055import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; 056import io.opentelemetry.sdk.trace.data.SpanData; 057import java.io.IOException; 058import java.net.InetSocketAddress; 059import java.nio.channels.SocketChannel; 060import java.time.Duration; 061import java.util.ArrayList; 062import java.util.Collections; 063import java.util.List; 064import org.apache.hadoop.conf.Configuration; 065import org.apache.hadoop.hbase.Cell; 066import org.apache.hadoop.hbase.CellScanner; 067import org.apache.hadoop.hbase.CellUtil; 068import org.apache.hadoop.hbase.DoNotRetryIOException; 069import org.apache.hadoop.hbase.HBaseConfiguration; 070import org.apache.hadoop.hbase.KeyValue; 071import org.apache.hadoop.hbase.MatcherPredicate; 072import org.apache.hadoop.hbase.Server; 073import org.apache.hadoop.hbase.ServerName; 074import org.apache.hadoop.hbase.Waiter; 075import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; 076import org.apache.hadoop.hbase.nio.ByteBuff; 077import org.apache.hadoop.hbase.regionserver.HRegionServer; 078import org.apache.hadoop.hbase.security.User; 079import org.apache.hadoop.hbase.util.Bytes; 080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 081import org.apache.hadoop.io.compress.GzipCodec; 082import org.apache.hadoop.ipc.RemoteException; 083import org.apache.hadoop.util.StringUtils; 084import org.hamcrest.Matcher; 085import org.junit.Before; 086import org.junit.Rule; 087import org.junit.Test; 088import org.junit.runners.Parameterized.Parameter; 089import org.slf4j.Logger; 090import org.slf4j.LoggerFactory; 091 092import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 093import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 094import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 095import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; 096import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 097 098import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 099import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 100import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 101import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 102import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 103import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 104import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 105import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; 109 110/** 111 * Some basic ipc tests. 112 */ 113public abstract class AbstractTestIPC { 114 115 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class); 116 117 private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); 118 private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); 119 120 protected static final Configuration CONF = HBaseConfiguration.create(); 121 122 protected RpcServer createRpcServer(Server server, String name, 123 List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf, 124 RpcScheduler scheduler) throws IOException { 125 return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); 126 } 127 128 private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services, 129 InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { 130 return createRpcServer(null, name, services, bindAddress, conf, scheduler); 131 } 132 133 protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf); 134 135 @Rule 136 public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); 137 138 @Parameter(0) 139 public Class<? extends RpcServer> rpcServerImpl; 140 141 @Before 142 public void setUpBeforeTest() { 143 CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class); 144 } 145 146 /** 147 * Ensure we do not HAVE TO HAVE a codec. 148 */ 149 @Test 150 public void testNoCodec() throws IOException, ServiceException { 151 Configuration clientConf = new Configuration(CONF); 152 RpcServer rpcServer = createRpcServer("testRpcServer", 153 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 154 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 155 try (AbstractRpcClient<?> client = createRpcClientNoCodec(clientConf)) { 156 rpcServer.start(); 157 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 158 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 159 String message = "hello"; 160 assertEquals(message, 161 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 162 assertNull(pcrc.cellScanner()); 163 } finally { 164 rpcServer.stop(); 165 } 166 } 167 168 protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf); 169 170 /** 171 * It is hard to verify the compression is actually happening under the wraps. Hope that if 172 * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to 173 * confirm that compression is happening down in the client and server). 174 */ 175 @Test 176 public void testCompressCellBlock() throws IOException, ServiceException { 177 Configuration clientConf = new Configuration(CONF); 178 clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); 179 List<Cell> cells = new ArrayList<>(); 180 int count = 3; 181 for (int i = 0; i < count; i++) { 182 cells.add(CELL); 183 } 184 RpcServer rpcServer = createRpcServer("testRpcServer", 185 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 186 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 187 188 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 189 rpcServer.start(); 190 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 191 HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); 192 String message = "hello"; 193 assertEquals(message, 194 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); 195 int index = 0; 196 CellScanner cellScanner = pcrc.cellScanner(); 197 assertNotNull(cellScanner); 198 while (cellScanner.advance()) { 199 assertEquals(CELL, cellScanner.current()); 200 index++; 201 } 202 assertEquals(count, index); 203 } finally { 204 rpcServer.stop(); 205 } 206 } 207 208 protected abstract AbstractRpcClient<?> 209 createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException; 210 211 @Test 212 public void testRTEDuringConnectionSetup() throws Exception { 213 Configuration clientConf = new Configuration(CONF); 214 RpcServer rpcServer = createRpcServer("testRpcServer", 215 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 216 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 217 try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(clientConf)) { 218 rpcServer.start(); 219 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 220 stub.ping(null, EmptyRequestProto.getDefaultInstance()); 221 fail("Expected an exception to have been thrown!"); 222 } catch (Exception e) { 223 LOG.info("Caught expected exception: " + e.toString()); 224 assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault")); 225 } finally { 226 rpcServer.stop(); 227 } 228 } 229 230 /** 231 * Tests that the rpc scheduler is called when requests arrive. 232 */ 233 @Test 234 public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { 235 Configuration clientConf = new Configuration(CONF); 236 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); 237 RpcServer rpcServer = createRpcServer("testRpcServer", 238 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 239 new InetSocketAddress("localhost", 0), CONF, scheduler); 240 verify(scheduler).init(any(RpcScheduler.Context.class)); 241 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 242 rpcServer.start(); 243 verify(scheduler).start(); 244 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 245 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 246 for (int i = 0; i < 10; i++) { 247 stub.echo(null, param); 248 } 249 verify(scheduler, times(10)).dispatch(any(CallRunner.class)); 250 } finally { 251 rpcServer.stop(); 252 verify(scheduler).stop(); 253 } 254 } 255 256 /** Tests that the rpc scheduler is called when requests arrive. */ 257 @Test 258 public void testRpcMaxRequestSize() throws IOException, ServiceException { 259 Configuration clientConf = new Configuration(CONF); 260 clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000); 261 RpcServer rpcServer = createRpcServer("testRpcServer", 262 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 263 new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1)); 264 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 265 rpcServer.start(); 266 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 267 StringBuilder message = new StringBuilder(1200); 268 for (int i = 0; i < 200; i++) { 269 message.append("hello."); 270 } 271 // set total RPC size bigger than 100 bytes 272 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); 273 stub.echo( 274 new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), 275 param); 276 fail("RPC should have failed because it exceeds max request size"); 277 } catch (ServiceException e) { 278 LOG.info("Caught expected exception: " + e); 279 assertTrue(e.toString(), 280 StringUtils.stringifyException(e).contains("RequestTooBigException")); 281 } finally { 282 rpcServer.stop(); 283 } 284 } 285 286 /** 287 * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null 288 * remoteAddress set to its Call Object 289 */ 290 @Test 291 public void testRpcServerForNotNullRemoteAddressInCallObject() 292 throws IOException, ServiceException { 293 Configuration clientConf = new Configuration(CONF); 294 RpcServer rpcServer = createRpcServer("testRpcServer", 295 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 296 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 297 InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); 298 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 299 rpcServer.start(); 300 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 301 assertEquals(localAddr.getAddress().getHostAddress(), 302 stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); 303 } finally { 304 rpcServer.stop(); 305 } 306 } 307 308 @Test 309 public void testRemoteError() throws IOException, ServiceException { 310 Configuration clientConf = new Configuration(CONF); 311 RpcServer rpcServer = createRpcServer("testRpcServer", 312 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 313 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 314 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 315 rpcServer.start(); 316 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 317 stub.error(null, EmptyRequestProto.getDefaultInstance()); 318 } catch (ServiceException e) { 319 LOG.info("Caught expected exception: " + e); 320 IOException ioe = ProtobufUtil.handleRemoteException(e); 321 assertTrue(ioe instanceof DoNotRetryIOException); 322 assertTrue(ioe.getMessage().contains("server error!")); 323 } finally { 324 rpcServer.stop(); 325 } 326 } 327 328 @Test 329 public void testTimeout() throws IOException { 330 Configuration clientConf = new Configuration(CONF); 331 RpcServer rpcServer = createRpcServer("testRpcServer", 332 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 333 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 334 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 335 rpcServer.start(); 336 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 337 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 338 int ms = 1000; 339 int timeout = 100; 340 for (int i = 0; i < 10; i++) { 341 pcrc.reset(); 342 pcrc.setCallTimeout(timeout); 343 long startTime = System.nanoTime(); 344 try { 345 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()); 346 } catch (ServiceException e) { 347 long waitTime = (System.nanoTime() - startTime) / 1000000; 348 // expected 349 LOG.info("Caught expected exception: " + e); 350 IOException ioe = ProtobufUtil.handleRemoteException(e); 351 assertTrue(ioe.getCause() instanceof CallTimeoutException); 352 // confirm that we got exception before the actual pause. 353 assertTrue(waitTime < ms); 354 } 355 } 356 } finally { 357 rpcServer.stop(); 358 } 359 } 360 361 private static class FailingSimpleRpcServer extends SimpleRpcServer { 362 363 FailingSimpleRpcServer(Server server, String name, 364 List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 365 Configuration conf, RpcScheduler scheduler) throws IOException { 366 super(server, name, services, bindAddress, conf, scheduler, true); 367 } 368 369 final class FailingConnection extends SimpleServerRpcConnection { 370 private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel, 371 long lastContact) { 372 super(rpcServer, channel, lastContact); 373 } 374 375 @Override 376 public void processRequest(ByteBuff buf) throws IOException, InterruptedException { 377 // this will throw exception after the connection header is read, and an RPC is sent 378 // from client 379 throw new DoNotRetryIOException("Failing for test"); 380 } 381 } 382 383 @Override 384 protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) { 385 return new FailingConnection(this, channel, time); 386 } 387 } 388 389 protected RpcServer createTestFailingRpcServer(final String name, 390 final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, 391 Configuration conf, RpcScheduler scheduler) throws IOException { 392 if (rpcServerImpl.equals(NettyRpcServer.class)) { 393 return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler); 394 } else { 395 return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler); 396 } 397 } 398 399 /** Tests that the connection closing is handled by the client with outstanding RPC calls */ 400 @Test 401 public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException { 402 Configuration clientConf = new Configuration(CONF); 403 RpcServer rpcServer = createTestFailingRpcServer("testRpcServer", 404 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 405 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 406 407 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 408 rpcServer.start(); 409 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 410 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); 411 stub.echo(null, param); 412 fail("RPC should have failed because connection closed"); 413 } catch (ServiceException e) { 414 LOG.info("Caught expected exception: " + e.toString()); 415 } finally { 416 rpcServer.stop(); 417 } 418 } 419 420 @Test 421 public void testAsyncEcho() throws IOException { 422 Configuration clientConf = new Configuration(CONF); 423 RpcServer rpcServer = createRpcServer("testRpcServer", 424 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 425 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 426 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 427 rpcServer.start(); 428 Interface stub = newStub(client, rpcServer.getListenerAddress()); 429 int num = 10; 430 List<HBaseRpcController> pcrcList = new ArrayList<>(); 431 List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>(); 432 for (int i = 0; i < num; i++) { 433 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 434 BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>(); 435 stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done); 436 pcrcList.add(pcrc); 437 callbackList.add(done); 438 } 439 for (int i = 0; i < num; i++) { 440 EchoResponseProto resp = callbackList.get(i).get(); 441 HBaseRpcController pcrc = pcrcList.get(i); 442 assertEquals("hello-" + i, resp.getMessage()); 443 assertFalse(pcrc.failed()); 444 assertNull(pcrc.cellScanner()); 445 } 446 } finally { 447 rpcServer.stop(); 448 } 449 } 450 451 @Test 452 public void testAsyncRemoteError() throws IOException { 453 Configuration clientConf = new Configuration(CONF); 454 AbstractRpcClient<?> client = createRpcClient(clientConf); 455 RpcServer rpcServer = createRpcServer("testRpcServer", 456 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 457 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 458 try { 459 rpcServer.start(); 460 Interface stub = newStub(client, rpcServer.getListenerAddress()); 461 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 462 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 463 stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback); 464 assertNull(callback.get()); 465 assertTrue(pcrc.failed()); 466 LOG.info("Caught expected exception: " + pcrc.getFailed()); 467 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 468 assertTrue(ioe instanceof DoNotRetryIOException); 469 assertTrue(ioe.getMessage().contains("server error!")); 470 } finally { 471 client.close(); 472 rpcServer.stop(); 473 } 474 } 475 476 @Test 477 public void testAsyncTimeout() throws IOException { 478 Configuration clientConf = new Configuration(CONF); 479 RpcServer rpcServer = createRpcServer("testRpcServer", 480 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 481 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 482 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 483 rpcServer.start(); 484 Interface stub = newStub(client, rpcServer.getListenerAddress()); 485 List<HBaseRpcController> pcrcList = new ArrayList<>(); 486 List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>(); 487 int ms = 1000; 488 int timeout = 100; 489 long startTime = System.nanoTime(); 490 for (int i = 0; i < 10; i++) { 491 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 492 pcrc.setCallTimeout(timeout); 493 BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>(); 494 stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback); 495 pcrcList.add(pcrc); 496 callbackList.add(callback); 497 } 498 for (BlockingRpcCallback<?> callback : callbackList) { 499 assertNull(callback.get()); 500 } 501 long waitTime = (System.nanoTime() - startTime) / 1000000; 502 for (HBaseRpcController pcrc : pcrcList) { 503 assertTrue(pcrc.failed()); 504 LOG.info("Caught expected exception: " + pcrc.getFailed()); 505 IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed()); 506 assertTrue(ioe.getCause() instanceof CallTimeoutException); 507 } 508 // confirm that we got exception before the actual pause. 509 assertTrue(waitTime < ms); 510 } finally { 511 rpcServer.stop(); 512 } 513 } 514 515 private SpanData waitSpan(Matcher<SpanData> matcher) { 516 Waiter.waitFor(CONF, 1000, 517 new MatcherPredicate<>(() -> traceRule.getSpans(), hasItem(matcher))); 518 return traceRule.getSpans().stream().filter(matcher::matches).findFirst() 519 .orElseThrow(AssertionError::new); 520 } 521 522 private static String buildIpcSpanName(final String packageAndService, final String methodName) { 523 return packageAndService + "/" + methodName; 524 } 525 526 private static Matcher<SpanData> buildIpcClientSpanMatcher(final String packageAndService, 527 final String methodName) { 528 return allOf(hasName(buildIpcSpanName(packageAndService, methodName)), 529 hasKind(SpanKind.CLIENT)); 530 } 531 532 private static Matcher<SpanData> buildIpcServerSpanMatcher(final String packageAndService, 533 final String methodName) { 534 return allOf(hasName(buildIpcSpanName(packageAndService, methodName)), 535 hasKind(SpanKind.SERVER)); 536 } 537 538 private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher( 539 final String packageAndService, final String methodName, final InetSocketAddress isa) { 540 return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"), 541 containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName), 542 containsEntry("net.peer.name", isa.getHostName()), 543 containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort()))); 544 } 545 546 private static Matcher<SpanData> 547 buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) { 548 return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"), 549 containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName))); 550 } 551 552 private void assertRemoteSpan() { 553 SpanData data = waitSpan(hasName("RpcServer.process")); 554 assertTrue(data.getParentSpanContext().isRemote()); 555 assertEquals(SpanKind.SERVER, data.getKind()); 556 } 557 558 @Test 559 public void testTracingSuccessIpc() throws IOException, ServiceException { 560 Configuration clientConf = new Configuration(CONF); 561 RpcServer rpcServer = createRpcServer("testRpcServer", 562 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 563 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 564 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 565 rpcServer.start(); 566 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 567 stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build()); 568 // use the ISA from the running server so that we can get the port selected. 569 final InetSocketAddress isa = rpcServer.getListenerAddress(); 570 final SpanData pauseClientSpan = 571 waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 572 assertThat(pauseClientSpan, 573 buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa)); 574 final SpanData pauseServerSpan = 575 waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 576 assertThat(pauseServerSpan, 577 buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause")); 578 assertRemoteSpan(); 579 assertFalse("no spans provided", traceRule.getSpans().isEmpty()); 580 assertThat(traceRule.getSpans(), 581 everyItem(allOf(hasStatusWithCode(StatusCode.OK), 582 hasTraceId(traceRule.getSpans().iterator().next().getTraceId()), 583 hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L)))))); 584 } 585 } 586 587 @Test 588 public void testTracingErrorIpc() throws IOException { 589 Configuration clientConf = new Configuration(CONF); 590 RpcServer rpcServer = createRpcServer("testRpcServer", 591 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), 592 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 593 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 594 rpcServer.start(); 595 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 596 // use the ISA from the running server so that we can get the port selected. 597 assertThrows(ServiceException.class, 598 () -> stub.error(null, EmptyRequestProto.getDefaultInstance())); 599 final InetSocketAddress isa = rpcServer.getListenerAddress(); 600 final SpanData errorClientSpan = 601 waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 602 assertThat(errorClientSpan, 603 buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa)); 604 final SpanData errorServerSpan = 605 waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 606 assertThat(errorServerSpan, 607 buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error")); 608 assertRemoteSpan(); 609 assertFalse("no spans provided", traceRule.getSpans().isEmpty()); 610 assertThat(traceRule.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR), 611 hasTraceId(traceRule.getSpans().iterator().next().getTraceId())))); 612 } 613 } 614 615 protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf); 616 617 private IOException doBadPreableHeaderCall(BlockingInterface stub) { 618 ServiceException se = assertThrows(ServiceException.class, 619 () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build())); 620 return ProtobufUtil.handleRemoteException(se); 621 } 622 623 @Test 624 public void testBadPreambleHeader() throws Exception { 625 Configuration clientConf = new Configuration(CONF); 626 RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(), 627 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 628 try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) { 629 rpcServer.start(); 630 BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); 631 BadAuthException error = null; 632 // for SimpleRpcServer, it is possible that we get a broken pipe before getting the 633 // BadAuthException, so we add some retries here, see HBASE-28417 634 for (int i = 0; i < 10; i++) { 635 IOException ioe = doBadPreableHeaderCall(stub); 636 if (ioe instanceof BadAuthException) { 637 error = (BadAuthException) ioe; 638 break; 639 } 640 Thread.sleep(100); 641 } 642 assertNotNull("Can not get expected BadAuthException", error); 643 assertThat(error.getMessage(), containsString("authName=unknown")); 644 } finally { 645 rpcServer.stop(); 646 } 647 } 648 649 /** 650 * Testcase for getting connection registry information through connection preamble header, see 651 * HBASE-25051 for more details. 652 */ 653 @Test 654 public void testGetConnectionRegistry() throws IOException, ServiceException { 655 Configuration clientConf = new Configuration(CONF); 656 String clusterId = "test_cluster_id"; 657 HRegionServer server = mock(HRegionServer.class); 658 when(server.getClusterId()).thenReturn(clusterId); 659 // do not need any services 660 RpcServer rpcServer = createRpcServer(server, "testRpcServer", Collections.emptyList(), 661 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 662 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 663 rpcServer.start(); 664 InetSocketAddress addr = rpcServer.getListenerAddress(); 665 BlockingRpcChannel channel = 666 client.createBlockingRpcChannel(ServerName.valueOf(addr.getHostName(), addr.getPort(), 667 EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0); 668 ConnectionRegistryService.BlockingInterface stub = 669 ConnectionRegistryService.newBlockingStub(channel); 670 GetConnectionRegistryResponse resp = 671 stub.getConnectionRegistry(null, GetConnectionRegistryRequest.getDefaultInstance()); 672 assertEquals(clusterId, resp.getClusterId()); 673 } 674 } 675 676 /** 677 * Test server does not support getting connection registry information through connection 678 * preamble header, i.e, a new client connecting to an old server. We simulate this by using a 679 * Server without implementing the ConnectionRegistryEndpoint interface. 680 */ 681 @Test 682 public void testGetConnectionRegistryError() throws IOException, ServiceException { 683 Configuration clientConf = new Configuration(CONF); 684 // do not need any services 685 RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(), 686 new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); 687 try (AbstractRpcClient<?> client = createRpcClient(clientConf)) { 688 rpcServer.start(); 689 InetSocketAddress addr = rpcServer.getListenerAddress(); 690 RpcChannel channel = client.createRpcChannel(ServerName.valueOf(addr.getHostName(), 691 addr.getPort(), EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0); 692 ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel); 693 HBaseRpcController pcrc = new HBaseRpcControllerImpl(); 694 BlockingRpcCallback<GetConnectionRegistryResponse> done = new BlockingRpcCallback<>(); 695 stub.getConnectionRegistry(pcrc, GetConnectionRegistryRequest.getDefaultInstance(), done); 696 // should have failed so no response 697 assertNull(done.get()); 698 assertTrue(pcrc.failed()); 699 // should be a FatalConnectionException 700 assertThat(pcrc.getFailed(), instanceOf(RemoteException.class)); 701 assertEquals(FatalConnectionException.class.getName(), 702 ((RemoteException) pcrc.getFailed()).getClassName()); 703 assertThat(pcrc.getFailed().getMessage(), startsWith("Expected HEADER=")); 704 } 705 } 706}