001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
026import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
027import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
028import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
029import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
030import static org.hamcrest.MatcherAssert.assertThat;
031import static org.hamcrest.Matchers.allOf;
032import static org.hamcrest.Matchers.containsString;
033import static org.hamcrest.Matchers.everyItem;
034import static org.hamcrest.Matchers.greaterThanOrEqualTo;
035import static org.hamcrest.Matchers.hasItem;
036import static org.hamcrest.Matchers.instanceOf;
037import static org.hamcrest.Matchers.startsWith;
038import static org.junit.Assert.assertEquals;
039import static org.junit.Assert.assertFalse;
040import static org.junit.Assert.assertNotNull;
041import static org.junit.Assert.assertNull;
042import static org.junit.Assert.assertThrows;
043import static org.junit.Assert.assertTrue;
044import static org.junit.Assert.fail;
045import static org.mockito.ArgumentMatchers.any;
046import static org.mockito.Mockito.mock;
047import static org.mockito.Mockito.spy;
048import static org.mockito.Mockito.verify;
049import static org.mockito.Mockito.when;
050import static org.mockito.internal.verification.VerificationModeFactory.times;
051
052import io.opentelemetry.api.common.AttributeKey;
053import io.opentelemetry.api.trace.SpanKind;
054import io.opentelemetry.api.trace.StatusCode;
055import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
056import io.opentelemetry.sdk.trace.data.SpanData;
057import java.io.IOException;
058import java.net.InetSocketAddress;
059import java.nio.channels.SocketChannel;
060import java.time.Duration;
061import java.util.ArrayList;
062import java.util.Collections;
063import java.util.List;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.hbase.Cell;
066import org.apache.hadoop.hbase.CellScanner;
067import org.apache.hadoop.hbase.CellUtil;
068import org.apache.hadoop.hbase.DoNotRetryIOException;
069import org.apache.hadoop.hbase.HBaseConfiguration;
070import org.apache.hadoop.hbase.KeyValue;
071import org.apache.hadoop.hbase.MatcherPredicate;
072import org.apache.hadoop.hbase.Server;
073import org.apache.hadoop.hbase.ServerName;
074import org.apache.hadoop.hbase.Waiter;
075import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
076import org.apache.hadoop.hbase.nio.ByteBuff;
077import org.apache.hadoop.hbase.regionserver.HRegionServer;
078import org.apache.hadoop.hbase.security.User;
079import org.apache.hadoop.hbase.util.Bytes;
080import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
081import org.apache.hadoop.io.compress.GzipCodec;
082import org.apache.hadoop.ipc.RemoteException;
083import org.apache.hadoop.util.StringUtils;
084import org.hamcrest.Matcher;
085import org.junit.Before;
086import org.junit.Rule;
087import org.junit.Test;
088import org.junit.runners.Parameterized.Parameter;
089import org.slf4j.Logger;
090import org.slf4j.LoggerFactory;
091
092import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
093import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
094import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
095import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
096import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
097
098import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
099import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
100import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
101import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
102import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;
103import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
104import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
105import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
109
110/**
111 * Some basic ipc tests.
112 */
113public abstract class AbstractTestIPC {
114
115  private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);
116
117  private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
118  private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
119
120  protected static final Configuration CONF = HBaseConfiguration.create();
121
122  protected RpcServer createRpcServer(Server server, String name,
123    List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf,
124    RpcScheduler scheduler) throws IOException {
125    return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);
126  }
127
128  private RpcServer createRpcServer(String name, List<BlockingServiceAndInterface> services,
129    InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {
130    return createRpcServer(null, name, services, bindAddress, conf, scheduler);
131  }
132
133  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
134
135  @Rule
136  public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
137
138  @Parameter(0)
139  public Class<? extends RpcServer> rpcServerImpl;
140
141  @Before
142  public void setUpBeforeTest() {
143    CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);
144  }
145
146  /**
147   * Ensure we do not HAVE TO HAVE a codec.
148   */
149  @Test
150  public void testNoCodec() throws IOException, ServiceException {
151    Configuration clientConf = new Configuration(CONF);
152    RpcServer rpcServer = createRpcServer("testRpcServer",
153      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
154      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
155    try (AbstractRpcClient<?> client = createRpcClientNoCodec(clientConf)) {
156      rpcServer.start();
157      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
158      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
159      String message = "hello";
160      assertEquals(message,
161        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
162      assertNull(pcrc.cellScanner());
163    } finally {
164      rpcServer.stop();
165    }
166  }
167
168  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
169
170  /**
171   * It is hard to verify the compression is actually happening under the wraps. Hope that if
172   * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to
173   * confirm that compression is happening down in the client and server).
174   */
175  @Test
176  public void testCompressCellBlock() throws IOException, ServiceException {
177    Configuration clientConf = new Configuration(CONF);
178    clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
179    List<Cell> cells = new ArrayList<>();
180    int count = 3;
181    for (int i = 0; i < count; i++) {
182      cells.add(CELL);
183    }
184    RpcServer rpcServer = createRpcServer("testRpcServer",
185      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
186      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
187
188    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
189      rpcServer.start();
190      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
191      HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
192      String message = "hello";
193      assertEquals(message,
194        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
195      int index = 0;
196      CellScanner cellScanner = pcrc.cellScanner();
197      assertNotNull(cellScanner);
198      while (cellScanner.advance()) {
199        assertEquals(CELL, cellScanner.current());
200        index++;
201      }
202      assertEquals(count, index);
203    } finally {
204      rpcServer.stop();
205    }
206  }
207
208  protected abstract AbstractRpcClient<?>
209    createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException;
210
211  @Test
212  public void testRTEDuringConnectionSetup() throws Exception {
213    Configuration clientConf = new Configuration(CONF);
214    RpcServer rpcServer = createRpcServer("testRpcServer",
215      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
216      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
217    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(clientConf)) {
218      rpcServer.start();
219      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
220      stub.ping(null, EmptyRequestProto.getDefaultInstance());
221      fail("Expected an exception to have been thrown!");
222    } catch (Exception e) {
223      LOG.info("Caught expected exception: " + e.toString());
224      assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
225    } finally {
226      rpcServer.stop();
227    }
228  }
229
230  /**
231   * Tests that the rpc scheduler is called when requests arrive.
232   */
233  @Test
234  public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
235    Configuration clientConf = new Configuration(CONF);
236    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
237    RpcServer rpcServer = createRpcServer("testRpcServer",
238      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
239      new InetSocketAddress("localhost", 0), CONF, scheduler);
240    verify(scheduler).init(any(RpcScheduler.Context.class));
241    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
242      rpcServer.start();
243      verify(scheduler).start();
244      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
245      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
246      for (int i = 0; i < 10; i++) {
247        stub.echo(null, param);
248      }
249      verify(scheduler, times(10)).dispatch(any(CallRunner.class));
250    } finally {
251      rpcServer.stop();
252      verify(scheduler).stop();
253    }
254  }
255
256  /** Tests that the rpc scheduler is called when requests arrive. */
257  @Test
258  public void testRpcMaxRequestSize() throws IOException, ServiceException {
259    Configuration clientConf = new Configuration(CONF);
260    clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);
261    RpcServer rpcServer = createRpcServer("testRpcServer",
262      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
263      new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1));
264    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
265      rpcServer.start();
266      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
267      StringBuilder message = new StringBuilder(1200);
268      for (int i = 0; i < 200; i++) {
269        message.append("hello.");
270      }
271      // set total RPC size bigger than 100 bytes
272      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();
273      stub.echo(
274        new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))),
275        param);
276      fail("RPC should have failed because it exceeds max request size");
277    } catch (ServiceException e) {
278      LOG.info("Caught expected exception: " + e);
279      assertTrue(e.toString(),
280        StringUtils.stringifyException(e).contains("RequestTooBigException"));
281    } finally {
282      rpcServer.stop();
283    }
284  }
285
286  /**
287   * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
288   * remoteAddress set to its Call Object
289   */
290  @Test
291  public void testRpcServerForNotNullRemoteAddressInCallObject()
292    throws IOException, ServiceException {
293    Configuration clientConf = new Configuration(CONF);
294    RpcServer rpcServer = createRpcServer("testRpcServer",
295      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
296      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
297    InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
298    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
299      rpcServer.start();
300      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
301      assertEquals(localAddr.getAddress().getHostAddress(),
302        stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());
303    } finally {
304      rpcServer.stop();
305    }
306  }
307
308  @Test
309  public void testRemoteError() throws IOException, ServiceException {
310    Configuration clientConf = new Configuration(CONF);
311    RpcServer rpcServer = createRpcServer("testRpcServer",
312      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
313      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
314    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
315      rpcServer.start();
316      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
317      stub.error(null, EmptyRequestProto.getDefaultInstance());
318    } catch (ServiceException e) {
319      LOG.info("Caught expected exception: " + e);
320      IOException ioe = ProtobufUtil.handleRemoteException(e);
321      assertTrue(ioe instanceof DoNotRetryIOException);
322      assertTrue(ioe.getMessage().contains("server error!"));
323    } finally {
324      rpcServer.stop();
325    }
326  }
327
328  @Test
329  public void testTimeout() throws IOException {
330    Configuration clientConf = new Configuration(CONF);
331    RpcServer rpcServer = createRpcServer("testRpcServer",
332      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
333      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
334    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
335      rpcServer.start();
336      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
337      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
338      int ms = 1000;
339      int timeout = 100;
340      for (int i = 0; i < 10; i++) {
341        pcrc.reset();
342        pcrc.setCallTimeout(timeout);
343        long startTime = System.nanoTime();
344        try {
345          stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());
346        } catch (ServiceException e) {
347          long waitTime = (System.nanoTime() - startTime) / 1000000;
348          // expected
349          LOG.info("Caught expected exception: " + e);
350          IOException ioe = ProtobufUtil.handleRemoteException(e);
351          assertTrue(ioe.getCause() instanceof CallTimeoutException);
352          // confirm that we got exception before the actual pause.
353          assertTrue(waitTime < ms);
354        }
355      }
356    } finally {
357      rpcServer.stop();
358    }
359  }
360
361  private static class FailingSimpleRpcServer extends SimpleRpcServer {
362
363    FailingSimpleRpcServer(Server server, String name,
364      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress bindAddress,
365      Configuration conf, RpcScheduler scheduler) throws IOException {
366      super(server, name, services, bindAddress, conf, scheduler, true);
367    }
368
369    final class FailingConnection extends SimpleServerRpcConnection {
370      private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,
371        long lastContact) {
372        super(rpcServer, channel, lastContact);
373      }
374
375      @Override
376      public void processRequest(ByteBuff buf) throws IOException, InterruptedException {
377        // this will throw exception after the connection header is read, and an RPC is sent
378        // from client
379        throw new DoNotRetryIOException("Failing for test");
380      }
381    }
382
383    @Override
384    protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {
385      return new FailingConnection(this, channel, time);
386    }
387  }
388
389  protected RpcServer createTestFailingRpcServer(final String name,
390    final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress,
391    Configuration conf, RpcScheduler scheduler) throws IOException {
392    if (rpcServerImpl.equals(NettyRpcServer.class)) {
393      return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);
394    } else {
395      return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);
396    }
397  }
398
399  /** Tests that the connection closing is handled by the client with outstanding RPC calls */
400  @Test
401  public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
402    Configuration clientConf = new Configuration(CONF);
403    RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",
404      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
405      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
406
407    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
408      rpcServer.start();
409      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
410      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
411      stub.echo(null, param);
412      fail("RPC should have failed because connection closed");
413    } catch (ServiceException e) {
414      LOG.info("Caught expected exception: " + e.toString());
415    } finally {
416      rpcServer.stop();
417    }
418  }
419
420  @Test
421  public void testAsyncEcho() throws IOException {
422    Configuration clientConf = new Configuration(CONF);
423    RpcServer rpcServer = createRpcServer("testRpcServer",
424      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
425      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
426    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
427      rpcServer.start();
428      Interface stub = newStub(client, rpcServer.getListenerAddress());
429      int num = 10;
430      List<HBaseRpcController> pcrcList = new ArrayList<>();
431      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
432      for (int i = 0; i < num; i++) {
433        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
434        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
435        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
436        pcrcList.add(pcrc);
437        callbackList.add(done);
438      }
439      for (int i = 0; i < num; i++) {
440        EchoResponseProto resp = callbackList.get(i).get();
441        HBaseRpcController pcrc = pcrcList.get(i);
442        assertEquals("hello-" + i, resp.getMessage());
443        assertFalse(pcrc.failed());
444        assertNull(pcrc.cellScanner());
445      }
446    } finally {
447      rpcServer.stop();
448    }
449  }
450
451  @Test
452  public void testAsyncRemoteError() throws IOException {
453    Configuration clientConf = new Configuration(CONF);
454    AbstractRpcClient<?> client = createRpcClient(clientConf);
455    RpcServer rpcServer = createRpcServer("testRpcServer",
456      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
457      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
458    try {
459      rpcServer.start();
460      Interface stub = newStub(client, rpcServer.getListenerAddress());
461      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
462      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
463      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
464      assertNull(callback.get());
465      assertTrue(pcrc.failed());
466      LOG.info("Caught expected exception: " + pcrc.getFailed());
467      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
468      assertTrue(ioe instanceof DoNotRetryIOException);
469      assertTrue(ioe.getMessage().contains("server error!"));
470    } finally {
471      client.close();
472      rpcServer.stop();
473    }
474  }
475
476  @Test
477  public void testAsyncTimeout() throws IOException {
478    Configuration clientConf = new Configuration(CONF);
479    RpcServer rpcServer = createRpcServer("testRpcServer",
480      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
481      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
482    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
483      rpcServer.start();
484      Interface stub = newStub(client, rpcServer.getListenerAddress());
485      List<HBaseRpcController> pcrcList = new ArrayList<>();
486      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
487      int ms = 1000;
488      int timeout = 100;
489      long startTime = System.nanoTime();
490      for (int i = 0; i < 10; i++) {
491        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
492        pcrc.setCallTimeout(timeout);
493        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
494        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
495        pcrcList.add(pcrc);
496        callbackList.add(callback);
497      }
498      for (BlockingRpcCallback<?> callback : callbackList) {
499        assertNull(callback.get());
500      }
501      long waitTime = (System.nanoTime() - startTime) / 1000000;
502      for (HBaseRpcController pcrc : pcrcList) {
503        assertTrue(pcrc.failed());
504        LOG.info("Caught expected exception: " + pcrc.getFailed());
505        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
506        assertTrue(ioe.getCause() instanceof CallTimeoutException);
507      }
508      // confirm that we got exception before the actual pause.
509      assertTrue(waitTime < ms);
510    } finally {
511      rpcServer.stop();
512    }
513  }
514
515  private SpanData waitSpan(Matcher<SpanData> matcher) {
516    Waiter.waitFor(CONF, 1000,
517      new MatcherPredicate<>(() -> traceRule.getSpans(), hasItem(matcher)));
518    return traceRule.getSpans().stream().filter(matcher::matches).findFirst()
519      .orElseThrow(AssertionError::new);
520  }
521
522  private static String buildIpcSpanName(final String packageAndService, final String methodName) {
523    return packageAndService + "/" + methodName;
524  }
525
526  private static Matcher<SpanData> buildIpcClientSpanMatcher(final String packageAndService,
527    final String methodName) {
528    return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),
529      hasKind(SpanKind.CLIENT));
530  }
531
532  private static Matcher<SpanData> buildIpcServerSpanMatcher(final String packageAndService,
533    final String methodName) {
534    return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),
535      hasKind(SpanKind.SERVER));
536  }
537
538  private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
539    final String packageAndService, final String methodName, final InetSocketAddress isa) {
540    return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),
541      containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName),
542      containsEntry("net.peer.name", isa.getHostName()),
543      containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
544  }
545
546  private static Matcher<SpanData>
547    buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) {
548    return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),
549      containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName)));
550  }
551
552  private void assertRemoteSpan() {
553    SpanData data = waitSpan(hasName("RpcServer.process"));
554    assertTrue(data.getParentSpanContext().isRemote());
555    assertEquals(SpanKind.SERVER, data.getKind());
556  }
557
558  @Test
559  public void testTracingSuccessIpc() throws IOException, ServiceException {
560    Configuration clientConf = new Configuration(CONF);
561    RpcServer rpcServer = createRpcServer("testRpcServer",
562      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
563      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
564    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
565      rpcServer.start();
566      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
567      stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
568      // use the ISA from the running server so that we can get the port selected.
569      final InetSocketAddress isa = rpcServer.getListenerAddress();
570      final SpanData pauseClientSpan =
571        waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
572      assertThat(pauseClientSpan,
573        buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa));
574      final SpanData pauseServerSpan =
575        waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
576      assertThat(pauseServerSpan,
577        buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));
578      assertRemoteSpan();
579      assertFalse("no spans provided", traceRule.getSpans().isEmpty());
580      assertThat(traceRule.getSpans(),
581        everyItem(allOf(hasStatusWithCode(StatusCode.OK),
582          hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),
583          hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
584    }
585  }
586
587  @Test
588  public void testTracingErrorIpc() throws IOException {
589    Configuration clientConf = new Configuration(CONF);
590    RpcServer rpcServer = createRpcServer("testRpcServer",
591      Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
592      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
593    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
594      rpcServer.start();
595      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
596      // use the ISA from the running server so that we can get the port selected.
597      assertThrows(ServiceException.class,
598        () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
599      final InetSocketAddress isa = rpcServer.getListenerAddress();
600      final SpanData errorClientSpan =
601        waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
602      assertThat(errorClientSpan,
603        buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa));
604      final SpanData errorServerSpan =
605        waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
606      assertThat(errorServerSpan,
607        buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));
608      assertRemoteSpan();
609      assertFalse("no spans provided", traceRule.getSpans().isEmpty());
610      assertThat(traceRule.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR),
611        hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
612    }
613  }
614
615  protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf);
616
617  private IOException doBadPreableHeaderCall(BlockingInterface stub) {
618    ServiceException se = assertThrows(ServiceException.class,
619      () -> stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));
620    return ProtobufUtil.handleRemoteException(se);
621  }
622
623  @Test
624  public void testBadPreambleHeader() throws Exception {
625    Configuration clientConf = new Configuration(CONF);
626    RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
627      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
628    try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
629      rpcServer.start();
630      BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
631      BadAuthException error = null;
632      // for SimpleRpcServer, it is possible that we get a broken pipe before getting the
633      // BadAuthException, so we add some retries here, see HBASE-28417
634      for (int i = 0; i < 10; i++) {
635        IOException ioe = doBadPreableHeaderCall(stub);
636        if (ioe instanceof BadAuthException) {
637          error = (BadAuthException) ioe;
638          break;
639        }
640        Thread.sleep(100);
641      }
642      assertNotNull("Can not get expected BadAuthException", error);
643      assertThat(error.getMessage(), containsString("authName=unknown"));
644    } finally {
645      rpcServer.stop();
646    }
647  }
648
649  /**
650   * Testcase for getting connection registry information through connection preamble header, see
651   * HBASE-25051 for more details.
652   */
653  @Test
654  public void testGetConnectionRegistry() throws IOException, ServiceException {
655    Configuration clientConf = new Configuration(CONF);
656    String clusterId = "test_cluster_id";
657    HRegionServer server = mock(HRegionServer.class);
658    when(server.getClusterId()).thenReturn(clusterId);
659    // do not need any services
660    RpcServer rpcServer = createRpcServer(server, "testRpcServer", Collections.emptyList(),
661      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
662    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
663      rpcServer.start();
664      InetSocketAddress addr = rpcServer.getListenerAddress();
665      BlockingRpcChannel channel =
666        client.createBlockingRpcChannel(ServerName.valueOf(addr.getHostName(), addr.getPort(),
667          EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0);
668      ConnectionRegistryService.BlockingInterface stub =
669        ConnectionRegistryService.newBlockingStub(channel);
670      GetConnectionRegistryResponse resp =
671        stub.getConnectionRegistry(null, GetConnectionRegistryRequest.getDefaultInstance());
672      assertEquals(clusterId, resp.getClusterId());
673    }
674  }
675
676  /**
677   * Test server does not support getting connection registry information through connection
678   * preamble header, i.e, a new client connecting to an old server. We simulate this by using a
679   * Server without implementing the ConnectionRegistryEndpoint interface.
680   */
681  @Test
682  public void testGetConnectionRegistryError() throws IOException, ServiceException {
683    Configuration clientConf = new Configuration(CONF);
684    // do not need any services
685    RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),
686      new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
687    try (AbstractRpcClient<?> client = createRpcClient(clientConf)) {
688      rpcServer.start();
689      InetSocketAddress addr = rpcServer.getListenerAddress();
690      RpcChannel channel = client.createRpcChannel(ServerName.valueOf(addr.getHostName(),
691        addr.getPort(), EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0);
692      ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel);
693      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
694      BlockingRpcCallback<GetConnectionRegistryResponse> done = new BlockingRpcCallback<>();
695      stub.getConnectionRegistry(pcrc, GetConnectionRegistryRequest.getDefaultInstance(), done);
696      // should have failed so no response
697      assertNull(done.get());
698      assertTrue(pcrc.failed());
699      // should be a FatalConnectionException
700      assertThat(pcrc.getFailed(), instanceOf(RemoteException.class));
701      assertEquals(FatalConnectionException.class.getName(),
702        ((RemoteException) pcrc.getFailed()).getClassName());
703      assertThat(pcrc.getFailed().getMessage(), startsWith("Expected HEADER="));
704    }
705  }
706}